Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions elasticgraph-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ def decode(payload)
# return an array of ElasticGraph indexing event hashes
end
```

Decoded event hashes do not need to provide a schema version. When a version is omitted, the latest
available schema artifact version is used for validation and record preparation. Decoders may include
`schema_version` to request a specific schema artifact version.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def initialize(config:, schema_artifacts:, logger:)
end

# @param payload [String] a raw payload from the transport
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events
# @return [Array<Hash<String, Object>>] the decoded ElasticGraph indexing events. Events do not
# need to include a schema version; when omitted, the latest available schema version is used.
def decode(payload)
# :nocov: -- must return an array to satisfy Steep type checking but never called
[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,30 @@ class Factory < Support::MemoizableData.define(
def build(event)
event = prepare_event(event)

selected_json_schema_version = select_json_schema_version(event) { |failure| return failure }
requested_schema_version = schema_version_from(event)
selected_schema_version = select_schema_version(event, requested_schema_version) { |failure| return failure }
event = event.merge(SCHEMA_VERSION_KEY => requested_schema_version)

# Because the `select_json_schema_version` picks the closest-matching json schema version, the incoming
# event might not match the expected json_schema_version value in the json schema (which is a `const` field).
# This is by design, since we're picking a schema based on best-effort, so to avoid that by-design validation error,
# performing the envelope validation on a "patched" version of the event.
event_with_patched_envelope = event.merge({JSON_SCHEMA_VERSION_KEY => selected_json_schema_version})
event_for_validation = schema_artifacts.event_for_schema_version_validation(event, selected_schema_version)

if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_json_schema_version).validate_with_error_message(event_with_patched_envelope))
if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_schema_version).validate_with_error_message(event_for_validation))
return build_failed_result(event, "event payload", error_message)
end

failed_result = validate_record_returning_failure(event, selected_json_schema_version)
failed_result = validate_record_returning_failure(event, selected_schema_version)
failed_result || BuildResult.success(build_all_operations_for(
event,
record_preparer_factory.for_json_schema_version(selected_json_schema_version)
record_preparer_factory.for_schema_version(selected_schema_version)
))
end

private

def select_json_schema_version(event)
available_json_schema_versions = schema_artifacts.available_json_schema_versions
def select_schema_version(event, requested_schema_version)
available_schema_versions = schema_artifacts.available_schema_versions

requested_json_schema_version = event[JSON_SCHEMA_VERSION_KEY]

# First check that a valid value has been requested (a positive integer)
if !event.key?(JSON_SCHEMA_VERSION_KEY)
yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "Event lacks a `#{JSON_SCHEMA_VERSION_KEY}`")
elsif !requested_json_schema_version.is_a?(Integer) || requested_json_schema_version < 1
yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "#{JSON_SCHEMA_VERSION_KEY} (#{requested_json_schema_version}) must be a positive integer.")
unless requested_schema_version.is_a?(Integer) && requested_schema_version >= 1
yield build_failed_result(event, SCHEMA_VERSION_KEY, "#{SCHEMA_VERSION_KEY} (#{requested_schema_version}) must be a positive integer.")
end

# The requested version might not necessarily be available (if the publisher is deployed ahead of the indexer, or an old schema
Expand All @@ -67,46 +60,46 @@ def select_json_schema_version(event)
# the event can still be indexed.
#
# This min_by block will take the closest version in the list. If a tie occurs, the first value in the list wins. The desired
# behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available json schema versions), the higher version
# behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available schema versions), the higher version
# should be selected. So to get that behavior, the list is sorted in descending order.
#
selected_json_schema_version = available_json_schema_versions.sort.reverse.min_by { |version| (requested_json_schema_version - version).abs }
selected_schema_version = available_schema_versions.sort.reverse.min_by { |version| (requested_schema_version - version).abs }

if selected_json_schema_version != requested_json_schema_version
if selected_schema_version != requested_schema_version
logger.info({
"message_type" => "ElasticGraphMissingJSONSchemaVersion",
"message_type" => "ElasticGraphMissingSchemaVersion",
"message_id" => event["message_id"],
"event_id" => EventID.from_event(event),
"event_type" => event["type"],
"requested_json_schema_version" => requested_json_schema_version,
"selected_json_schema_version" => selected_json_schema_version
"requested_schema_version" => requested_schema_version,
"selected_schema_version" => selected_schema_version
})
end

if selected_json_schema_version.nil?
if selected_schema_version.nil?
yield build_failed_result(
event, JSON_SCHEMA_VERSION_KEY,
"Failed to select json schema version. Requested version: #{event[JSON_SCHEMA_VERSION_KEY]}. \
Available json schema versions: #{available_json_schema_versions.sort.join(", ")}"
event, SCHEMA_VERSION_KEY,
"Failed to select schema version. Requested version: #{requested_schema_version}. \
Available schema versions: #{available_schema_versions.sort.join(", ")}"
)
end

selected_json_schema_version
selected_schema_version
end

def validator(type, selected_json_schema_version)
factory = validator_factories_by_version[selected_json_schema_version] # : Support::JSONSchema::ValidatorFactory
def validator(type, selected_schema_version)
factory = validator_factories_by_version[selected_schema_version] # : Support::JSONSchema::ValidatorFactory
factory.validator_for(type)
end

def validator_factories_by_version
@validator_factories_by_version ||= ::Hash.new do |hash, json_schema_version|
@validator_factories_by_version ||= ::Hash.new do |hash, schema_version|
factory = Support::JSONSchema::ValidatorFactory.new(
schema: schema_artifacts.json_schemas_for(json_schema_version),
schema: schema_artifacts.json_schemas_for(schema_version),
sanitize_pii: true
)
factory = configure_record_validator.call(factory) if configure_record_validator
hash[json_schema_version] = factory
hash[schema_version] = factory
end
end

Expand All @@ -117,10 +110,14 @@ def prepare_event(event)
event.merge("record" => event["record"].merge("id" => event.fetch("id")))
end

def validate_record_returning_failure(event, selected_json_schema_version)
def schema_version_from(event)
event.fetch(SCHEMA_VERSION_KEY) { schema_artifacts.available_schema_versions.max }
end

def validate_record_returning_failure(event, selected_schema_version)
record = event.fetch("record")
graphql_type_name = event.fetch("type")
validator = validator(graphql_type_name, selected_json_schema_version)
validator = validator(graphql_type_name, selected_schema_version)

if (error_message = validator.validate_with_error_message(record))
build_failed_result(event, "#{graphql_type_name} record", error_message)
Expand All @@ -130,7 +127,7 @@ def validate_record_returning_failure(event, selected_json_schema_version)
def build_failed_result(event, payload_description, validation_message)
message = "Malformed #{payload_description}. #{validation_message}"

# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid JSON schema
# Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid schema
# version number in this case (which is usually required to get a `RecordPreparer` from the factory), and
# we won't wind up using the record preparer for real on these operations, anyway.
operations = build_all_operations_for(event, RecordPreparer::Identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def calculate_latency_metrics(successful_operations, noop_results)
"message_id" => event["message_id"],
"event_type" => event.fetch("type"),
"event_id" => EventID.from_event(event).to_s,
JSON_SCHEMA_VERSION_KEY => event.fetch(JSON_SCHEMA_VERSION_KEY),
SCHEMA_VERSION_KEY => event.fetch(SCHEMA_VERSION_KEY),
"latencies_in_ms_from" => latencies_in_ms_from,
"slo_results" => slo_results,
"result" => result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
module ElasticGraph
class Indexer
class RecordPreparer
# Provides the ability to get a `RecordPreparer` for a specific JSON schema version.
# Provides the ability to get a `RecordPreparer` for a specific schema artifact version.
class Factory
def initialize(schema_artifacts)
@schema_artifacts = schema_artifacts
Expand All @@ -21,23 +21,23 @@ def initialize(schema_artifacts)
hash[type_name] = scalar_types_by_name[type_name]&.load_indexing_preparer&.extension_class
end # : ::Hash[::String, SchemaArtifacts::RuntimeMetadata::extensionClass?]

@preparers_by_json_schema_version = ::Hash.new do |hash, version|
@preparers_by_schema_version = ::Hash.new do |hash, version|
hash[version] = RecordPreparer.new(
indexing_preparer_by_scalar_type_name,
build_type_metas_from(@schema_artifacts.json_schemas_for(version))
)
end
end

# Gets the `RecordPreparer` for the given JSON schema version.
def for_json_schema_version(json_schema_version)
@preparers_by_json_schema_version[json_schema_version] # : RecordPreparer
# Gets the `RecordPreparer` for the given schema artifact version.
def for_schema_version(schema_version)
@preparers_by_schema_version[schema_version] # : RecordPreparer
end

# Gets the `RecordPreparer` for the latest JSON schema version. Intended primarily
# Gets the `RecordPreparer` for the latest schema artifact version. Intended primarily
# for use in tests for convenience.
def for_latest_json_schema_version
for_json_schema_version(@schema_artifacts.latest_json_schema_version)
def for_latest_schema_version
for_schema_version(@schema_artifacts.latest_schema_version)
end

private
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def self.upsert_event_for(record)
"id" => record.fetch("id"),
"type" => record.fetch("__typename"),
"version" => record.fetch("__version"),
"record" => record.except("__typename", "__version", "__json_schema_version"),
JSON_SCHEMA_VERSION_KEY => record.fetch("__json_schema_version")
"record" => record.except("__typename", "__version", "__schema_version"),
SCHEMA_VERSION_KEY => record.fetch("__schema_version")
}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ module ElasticGraph
@validator_factories_by_version: ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]?
def validator_factories_by_version: () -> ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]

def select_json_schema_version: (event) { (BuildResult) -> bot } -> (::Integer | bot)
def select_schema_version: (event, untyped) { (BuildResult) -> bot } -> (::Integer | bot)
def prepare_event: (event) -> event
def schema_version_from: (event) -> untyped
def validate_record_returning_failure: (event, ::Integer) -> BuildResult?
def build_failed_result: (event, ::String, ::String) -> BuildResult
def build_all_operations_for: (event, _RecordPreparer) -> ::Array[_Operation]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ module ElasticGraph
class Factory
def initialize: (schemaArtifacts) -> void

def for_json_schema_version: (::Integer) -> RecordPreparer
def for_latest_json_schema_version: () -> RecordPreparer
def for_schema_version: (::Integer) -> RecordPreparer
def for_latest_schema_version: () -> RecordPreparer

private

@schema_artifacts: schemaArtifacts
@preparers_by_json_schema_version: ::Hash[::Integer, RecordPreparer]
@preparers_by_schema_version: ::Hash[::Integer, RecordPreparer]

def build_type_metas_from: (::Hash[::String, untyped]) -> ::Array[TypeMetadata]
end
Expand Down
20 changes: 10 additions & 10 deletions elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def build_address_event_without_geolocation
end

def build_widget(json_schema_version:)
event = build_upsert_event(:widget, __json_schema_version: json_schema_version)
event = build_upsert_event(:widget, __schema_version: json_schema_version)
event.merge("record" => (yield event.fetch("record")))
end
end
Expand All @@ -116,7 +116,7 @@ def build_widget(json_schema_version:)
write_address_schema_def(json_schema_version: 2, address_extras: "t.deleted_field 'deprecated'")
dump_artifacts

event = build_upsert_event(:address, id: "abc", deprecated: "foo", __json_schema_version: 1)
event = build_upsert_event(:address, id: "abc", deprecated: "foo", __schema_version: 1)
expect(event.dig("record", "deprecated")).to eq("foo")

boot_indexer.processor.process([event], refresh_indices: true)
Expand Down Expand Up @@ -162,8 +162,8 @@ def get_address_payload(id)
# included at that part of the JSON schema. So here we verify that the factory includes that.
expect(build(:team_season)).to include(__typename: "TeamSeason")

v1_event = build_upsert_event(:team, __json_schema_version: 1)
v2_event = build_upsert_event(:team, __json_schema_version: 2)
v1_event = build_upsert_event(:team, __schema_version: 1)
v2_event = build_upsert_event(:team, __schema_version: 2)
.then { |event| ::JSON.generate(event) }
# Fix the event to align with the v2 schema, since `build_upsert_event` doesn't automatically
# know that the `__typename` should be `SeasonOfATeam` instead of `TeamSeason`.
Expand Down Expand Up @@ -200,8 +200,8 @@ def get_address_payload(id)
end
dump_artifacts

v1_event = build_upsert_event(:team, __json_schema_version: 1)
v2_event = build_upsert_event(:team, __json_schema_version: 2)
v1_event = build_upsert_event(:team, __schema_version: 1)
v2_event = build_upsert_event(:team, __schema_version: 2)

expect {
boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true)
Expand Down Expand Up @@ -244,9 +244,9 @@ def get_address_payload(id)
end
dump_artifacts

v1_event = build_upsert_event(:team, __json_schema_version: 1)
v1_event = build_upsert_event(:team, __schema_version: 1)
v1_event = ::JSON.parse(::JSON.generate(v1_event).gsub('"name":', '"full_name":'))
v2_event = build_upsert_event(:team, __json_schema_version: 2)
v2_event = build_upsert_event(:team, __schema_version: 2)

expect {
boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true)
Expand Down Expand Up @@ -288,7 +288,7 @@ def get_address_payload(id)
end
dump_artifacts

v1_event = build_upsert_event(:team, __json_schema_version: 1)
v1_event = build_upsert_event(:team, __schema_version: 1)

expect {
boot_indexer.processor.process([v1_event], refresh_indices: true)
Expand Down Expand Up @@ -323,7 +323,7 @@ def get_address_payload(id)
write_address_schema_def(json_schema_version: 2, schema_extras: 'schema.deleted_type "Team"')
dump_artifacts

v1_event = build_upsert_event(:team, __json_schema_version: 1)
v1_event = build_upsert_event(:team, __schema_version: 1)
boot_indexer.processor.process([v1_event], refresh_indices: true)

expect(search_for_ids("teams")).to be_empty
Expand Down
2 changes: 1 addition & 1 deletion elasticgraph-indexer/spec/support/indexing_preparer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
schema.object_type "Object" do |t|
t.field "scalar", scalar_type
end
end).record_preparer_factory.for_latest_json_schema_version
end).record_preparer_factory.for_latest_schema_version
end

def prepare_scalar_value(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def build_indexer_with_multiple_schema_versions(schema_versions:)

allow(artifacts).to receive(:available_json_schema_versions).and_return(json_schemas_by_version.keys.to_set)
allow(artifacts).to receive(:latest_json_schema_version).and_return(json_schemas_by_version.keys.max)
allow(artifacts).to receive(:available_schema_versions).and_return(json_schemas_by_version.keys.to_set)
allow(artifacts).to receive(:latest_schema_version).and_return(json_schemas_by_version.keys.max)
allow(artifacts).to receive(:json_schemas_for) do |version|
json_schemas_by_version.fetch(version)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def new_operation(event, update_target: nil, **overrides)

arguments = {
event: event,
prepared_record: indexer.record_preparer_factory.for_latest_json_schema_version.prepare_for_index(
prepared_record: indexer.record_preparer_factory.for_latest_schema_version.prepare_for_index(
event.fetch("type"),
event.fetch("record"),
destination_index_mapping.fetch("properties")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def script_params_for(data:, source_type:, destination_type:, indexer: build_ind
update = Update.new(
event: {"type" => source_type, "record" => data},
destination_index_def: destination_index_def,
prepared_record: indexer.record_preparer_factory.for_latest_json_schema_version.prepare_for_index(
prepared_record: indexer.record_preparer_factory.for_latest_schema_version.prepare_for_index(
source_type,
data,
destination_index_mapping.fetch("properties")
Expand Down
Loading
Loading