Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,14 @@ def new_field_source(relationship_name:, field_path:)
end
@@field_source_new = prevent_non_factory_instantiation_of(SchemaElements::FieldSource)

def new_relationship(field, cardinality:, related_type:, foreign_key:, direction:)
def new_relationship(field, cardinality:, related_type:, foreign_key:, direction:, indexing_only:)
@@relationship_new.call(
field,
cardinality: cardinality,
related_type: related_type,
foreign_key: foreign_key,
direction: direction
direction: direction,
indexing_only: indexing_only
)
end
@@relationship_new = prevent_non_factory_instantiation_of(SchemaElements::Relationship)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Copyright 2024 - 2026 Block, Inc.
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#
# frozen_string_literal: true

require "elastic_graph/errors"

module ElasticGraph
module SchemaDefinition
module Indexing
# The result of resolving a relationship chain.
#
# @private
ResolvedRelationshipChain = ::Data.define(
:root_relationship, # Relationship - the root relationship (no parent_relationship)
:path_segments # Array<PathSegment> - ordered root-to-leaf
)

# Describes how to navigate from a parent type into a nested child element.
# For list fields, `source_field_name` identifies which element to update: the element
# whose `id` matches `event[source_field_name]`. We implicitly match on the `id` field
# because ElasticGraph relationships always join on `id` via foreign keys; this could be
# made configurable in the future to support non-`id` primary keys.
# For non-list (object) fields, `source_field_name` is nil since there's no ambiguity.
#
# @private
PathSegment = ::Data.define(
:field, # Field - the field to navigate into at this level
:source_field_name # String? - field name on the source event providing the match value (nil for object fields)
)

# Resolves a chain of `parent_relationship` links from a leaf embedded type up to the
# root indexed type. Produces a `ResolvedRelationshipChain` on success, or errors
# describing what's invalid.
#
# @private
class RelationshipChainResolver
def initialize(schema_def_state:)
@schema_def_state = schema_def_state

# Lazily groups each parent type's indexing fields by their fully-unwrapped field type name,
# so `find_field_by_type` can look up candidate embedding fields without re-scanning per chain.
@indexing_fields_by_field_type_name_by_parent_type = ::Hash.new do |hash, parent_type|
hash[parent_type] = parent_type.indexing_fields_by_name_in_index.values.group_by do |field|
field.type.fully_unwrapped.name
end
end
end

# Resolves the chain starting from `starting_relationship` (which must have a `parent_ref`).
#
# Returns a tuple of [resolved_chain, errors].
# If errors is non-empty, resolved_chain will be nil.
def resolve(starting_relationship)
errors = [] # : ::Array[::String]
path_segments = [] # : ::Array[PathSegment]
visited_relationships = Set[starting_relationship]

# resolve_chain returns the chain's root relationship (the one with no parent_ref), or nil
# if it hit an error walking the chain (in which case the error is already recorded).
root_relationship = resolve_chain(starting_relationship, path_segments, errors, visited_relationships)
return [nil, errors] unless root_relationship

# A valid chain must terminate at a relationship defined on an indexed type.
root_type = root_relationship.parent_type
unless root_type.root_document_type?
errors << "The `parent_relationship` chain from #{rel_description(starting_relationship)} " \
"terminates at `#{root_type.name}`, but `#{root_type.name}` is not an indexed type. " \
"The chain must terminate at an indexed type."
return [nil, errors]
end

resolved_chain = ResolvedRelationshipChain.new(
root_relationship: root_relationship,
path_segments: path_segments.reverse # reverse so root-to-leaf order
)

[resolved_chain, errors]
end

private

# Recursively walks from leaf to root, building path segments in reverse. Returns the root
# relationship (the one with no parent_ref) on success, or nil if an error was encountered.
def resolve_chain(current_rel, path_segments, errors, visited_relationships)
parent_ref = current_rel.parent_ref
return current_rel unless parent_ref

parent_rel = resolve_parent_ref(current_rel, parent_ref, errors, visited_relationships)
return nil unless parent_rel

build_path_segment(current_rel, parent_rel.parent_type, path_segments, errors)
return nil if errors.any?

visited_relationships.add(parent_rel)
resolve_chain(parent_rel, path_segments, errors, visited_relationships)
end

# Resolves a parent_ref into the concrete parent relationship.
# Returns the parent relationship on success, or appends to errors and returns nil.
def resolve_parent_ref(current_rel, ref, errors, visited_relationships)
unless current_rel.indexing_only
errors << "#{rel_description(current_rel)} uses `parent_relationship` but is not declared with " \
"`indexing_only: true`. Relationships with `parent_relationship` must be indexing-only."
return nil
end

parent_type = ref.type_ref.as_object_type # : SchemaElements::ObjectType?
unless parent_type
errors << "#{rel_description(current_rel)} references parent type " \
"`#{ref.type_ref.name}` via `parent_relationship`, but that type does not exist. Is it misspelled?"
return nil
end

parent_rel = parent_type.relationships_by_name[ref.relationship_name]
unless parent_rel
errors << "#{rel_description(current_rel)} references parent relationship " \
"`#{parent_type.name}.#{ref.relationship_name}` via `parent_relationship`, " \
"but that relationship does not exist. Is it misspelled?"
return nil
end

if visited_relationships.include?(parent_rel)
errors << "#{rel_description(current_rel)} creates a circular `parent_relationship` chain " \
"— `#{parent_type.name}.#{ref.relationship_name}` was already visited. The chain must terminate at a root indexed type."
return nil
end

current_source_type_name = current_rel.related_type.name
parent_source_type_name = parent_rel.related_type.name
unless current_source_type_name == parent_source_type_name
errors << "#{rel_description(current_rel)} relates to `#{current_source_type_name}`, " \
"but its parent relationship `#{parent_type.name}.#{ref.relationship_name}` relates to " \
"`#{parent_source_type_name}`. All relationships in a `parent_relationship` chain must relate to the same source type."
return nil
end

parent_rel
end

# Builds a PathSegment for the current level and appends it to path_segments.
# Uses the explicitly specified field name if provided, otherwise auto-discovers it.
def build_path_segment(current_rel, parent_type, path_segments, errors)
parent_ref = current_rel.parent_ref # : SchemaElements::Relationship::ParentRef
field = resolve_field(parent_ref, parent_type, current_rel, errors)
return unless field

# For list fields, `source_field_name` identifies which element to update: the one whose
# `id` matches `event[source_field_name]`. We implicitly match on `id` because ElasticGraph
# relationships always join on `id` via foreign keys. For non-list fields, it's nil since
# there's no ambiguity.
path_segments << if field.type.list?
PathSegment.new(
field: field,
source_field_name: current_rel.foreign_key
)
else
PathSegment.new(
field: field,
source_field_name: nil
)
end
end

def resolve_field(parent_ref, parent_type, current_rel, errors)
if parent_ref.field_name
field = parent_type.indexing_fields_by_name_in_index[parent_ref.field_name]
unless field
errors << "#{rel_description(current_rel)} references field `#{parent_type.name}.#{parent_ref.field_name}` " \
"via `parent_relationship`, but that field does not exist."
end
field
else
find_field_by_type(parent_type, current_rel, errors)
end
end

def find_field_by_type(parent_type, current_rel, errors)
child_type = current_rel.parent_type
matches = @indexing_fields_by_field_type_name_by_parent_type.dig(parent_type, child_type.name) || []

if matches.size > 1
field_names = matches.map(&:name).join(", ")
parent_ref = current_rel.parent_ref # : SchemaElements::Relationship::ParentRef
errors << "#{rel_description(current_rel)} has an ambiguous `parent_relationship` — " \
"`#{parent_type.name}` has multiple fields of type `#{child_type.name}` (#{field_names}). " \
"Specify which field using the `parent_field_name:` option: " \
"`r.parent_relationship \"#{parent_type.name}\", \"#{parent_ref.relationship_name}\", parent_field_name: \"<field_name>\"`"
nil
elsif matches.empty?
errors << "#{rel_description(current_rel)} declares `#{parent_type.name}` as its parent type " \
"via `parent_relationship`, but `#{parent_type.name}` has no field of type `#{child_type.name}`."
nil
else
matches.first
end
end

def rel_description(relationship)
"`#{relationship.parent_type.name}.#{relationship.name}`"
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# frozen_string_literal: true

require "elastic_graph/errors"
require "elastic_graph/schema_definition/indexing/relationship_chain_resolver"
require "elastic_graph/schema_definition/indexing/relationship_resolver"
require "elastic_graph/schema_definition/indexing/update_target_resolver"

Expand Down Expand Up @@ -48,7 +49,7 @@ def resolve_for_type(object_type, &error_reporter)
fields_with_sources_by_relationship_name = sourced_fields_by_relationship_name(object_type)
defined_relationships = object_type.relationships_by_name.keys

(defined_relationships | fields_with_sources_by_relationship_name.keys).filter_map do |relationship_name|
results = (defined_relationships | fields_with_sources_by_relationship_name.keys).filter_map do |relationship_name|
empty_fields = [] # : ::Array[SchemaElements::Field]
sourced_fields = fields_with_sources_by_relationship_name.fetch(relationship_name) { empty_fields }
relationship_resolver = RelationshipResolver.new(
Expand All @@ -65,6 +66,12 @@ def resolve_for_type(object_type, &error_reporter)
resolve_update_target(object_type, resolved_relationship, sourced_fields, &error_reporter)
end
end

# Resolve any `parent_relationship` chains on this type. For now this only surfaces
# configuration errors; later PRs will use the resolved chains to build nested update targets.
resolve_relationship_chains(object_type, &error_reporter)

results
end

def resolve_update_target(object_type, resolved_relationship, sourced_fields)
Expand All @@ -91,6 +98,19 @@ def resolve_update_target(object_type, resolved_relationship, sourced_fields)
[resolved_relationship.related_type.name, update_target] if update_target
end

def resolve_relationship_chains(object_type)
relationships_with_parent_ref = object_type.relationships_by_name.each_value.select(&:parent_ref)
return if relationships_with_parent_ref.empty?

chain_resolver = RelationshipChainResolver.new(schema_def_state: @schema_def_state)

relationships_with_parent_ref.each do |relationship|
# TODO: use resolved_chain to build nested update targets once that logic is implemented.
_resolved_chain, chain_errors = chain_resolver.resolve(relationship)
chain_errors.each { |error| yield :sourced_field, error }
end
end

def sourced_fields_by_relationship_name(object_type)
if object_type.own_index_def.nil?
# For now, only indexed types can have `sourced_from` fields, and resolving `fields_with_sources` on an unindexed union type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,40 @@ module SchemaElements
# end
# end
class Relationship < DelegateClass(Field)
# @dynamic related_type, hide_relationship_runtime_metadata, hide_relationship_runtime_metadata=
# @dynamic related_type, foreign_key, hide_relationship_runtime_metadata, hide_relationship_runtime_metadata=, parent_ref, indexing_only

# @private
ParentRef = ::Data.define(:type_ref, :relationship_name, :field_name)

# @return [ObjectType, InterfaceType, UnionType] the type this relationship relates to
attr_reader :related_type

# @return [String] the foreign key field name (the `via` parameter)
# @private
attr_reader :foreign_key

# @private
attr_accessor :hide_relationship_runtime_metadata

# @private
def initialize(field, cardinality:, related_type:, foreign_key:, direction:)
attr_reader :parent_ref

# @return [Boolean] true if this relationship is for indexing only (not exposed in GraphQL)
# @private
attr_reader :indexing_only

# @private
def initialize(field, cardinality:, related_type:, foreign_key:, direction:, indexing_only: false)
super(field)
self.hide_relationship_runtime_metadata = false
@cardinality = cardinality
@related_type = related_type
@foreign_key = foreign_key
@direction = direction
@indexing_only = indexing_only
@equivalent_field_paths_by_local_path = {}
@additional_filter = {}
@parent_ref = nil
end

# Adds additional filter conditions to a relationship beyond the foreign key.
Expand Down Expand Up @@ -136,6 +152,65 @@ def equivalent_field(path, locally_named: path)
end
end

# Indicates that this relationship chains through a parent relationship to reach the root indexed type.
#
# Use this API when defining relationships on embedded (non-indexed) types that need to use `sourced_from`
# on their fields. By chaining relationships through parent types, ElasticGraph can resolve the path from
# the nested type up to the root indexed type and properly update nested fields when source events arrive.
#
# @param parent_type_name [String] name of the parent type in the nesting hierarchy
# @param parent_relationship_name [String] name of the relationship on the parent type
# @param parent_field_name [String, nil] name of the field on the parent type that embeds this type.
# When omitted, auto-discovered by finding the field on the parent type whose type matches this type.
# Required when the parent type has multiple fields of this type.
# @return [void]
#
# @example Define a nested sourced_from relationship chain
# ElasticGraph.define_schema do |schema|
# schema.object_type "Team" do |t|
# t.field "id", "ID!"
# t.field "name", "String"
# t.field "players", "[Player!]!" do |f|
# f.mapping type: "nested"
# end
# t.relates_to_many "statLines", "StatLine", via: "teamId", dir: :in, indexing_only: true
# t.index "teams" do |i|
# i.has_had_multiple_sources!
# end
# end
#
# schema.object_type "Player" do |t|
# t.field "id", "ID!"
# t.field "name", "String"
# t.field "goalsScored", "Int" do |f|
# f.sourced_from "statLine", "goals"
# end
# t.relates_to_one "statLine", "StatLine", via: "playerId", dir: :in, indexing_only: true do |r|
# r.parent_relationship "Team", "statLines"
# end
# end
#
# schema.object_type "StatLine" do |t|
# t.field "id", "ID!"
# t.field "teamId", "ID"
# t.field "playerId", "ID"
# t.field "goals", "Int"
# t.index "stat_lines"
# end
# end
def parent_relationship(parent_type_name, parent_relationship_name, parent_field_name: nil)
if @parent_ref
raise Errors::SchemaError, "`parent_relationship` has been called multiple times on `#{parent_type.name}.#{name}`, " \
"but each relationship can have only one `parent_relationship`."
end

@parent_ref = ParentRef.new(
type_ref: schema_def_state.type_ref(parent_type_name),
relationship_name: parent_relationship_name,
field_name: parent_field_name
)
end

# Gets the `routing_value_source` from this relationship for the given `index`, based on the configured
# routing used by `index` and the configured equivalent fields.
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ def relates_to(field_name, type, via:, dir:, foreign_key_type:, cardinality:, re
cardinality: cardinality,
related_type: schema_def_state.type_ref(related_type).to_final_form,
foreign_key: via,
direction: dir
direction: dir,
indexing_only: indexing_only
)

field.relationship = relationship
Expand Down
Loading