From 609617bd4cf5db764dba1113568e4e3cd159d871 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Tue, 30 Jun 2026 18:58:44 -0500 Subject: [PATCH] Generalize indexing schema versions --- elasticgraph-indexer/README.md | 4 + .../indexer/indexing_event_decoder.rb | 3 +- .../indexer/operation/factory.rb | 71 ++++++++-------- .../lib/elastic_graph/indexer/processor.rb | 2 +- .../elastic_graph/indexer/record_preparer.rb | 16 ++-- .../indexer/test_support/converters.rb | 4 +- .../indexer/operation/factory.rbs | 3 +- .../elastic_graph/indexer/record_preparer.rbs | 6 +- .../spec/acceptance/schema_evolution_spec.rb | 20 ++--- .../spec/support/indexing_preparer.rb | 2 +- .../spec/support/multiple_version_support.rb | 2 + .../indexer/datastore_indexing_router_spec.rb | 2 +- .../operation/count_accumulator_spec.rb | 2 +- .../indexer/operation/factory_spec.rb | 83 +++++++++++-------- .../indexer/operation/update_spec.rb | 2 +- .../indexer/record_preparer_spec.rb | 22 ++--- .../indexer/test_support/converters_spec.rb | 12 +-- elasticgraph-json_ingestion/README.md | 13 +++ .../json_ingestion/indexing_event_decoder.rb | 43 ++++++++++ .../schema_definition/results_extension.rb | 17 ++++ .../json_ingestion/indexing_event_decoder.rbs | 21 +++++ .../schema_definition/results_extension.rbs | 6 ++ .../indexing_event_decoder_spec.rb | 36 ++++++++ .../schema_definition/json_schema_spec.rb | 12 ++- .../schema_artifacts/from_disk.rb | 23 +++++ .../sig/elastic_graph/schema_artifacts.rbs | 6 ++ .../schema_artifacts/from_disk_spec.rb | 25 ++++++ .../lib/elastic_graph/constants.rb | 4 + .../sig/elastic_graph/constants.rbs | 1 + elasticgraph-warehouse_lambda/README.md | 12 +-- .../warehouse_lambda/warehouse_dumper.rb | 16 ++-- .../warehouse_lambda/warehouse_dumper_spec.rb | 26 +++--- .../spec_support/builds_indexer_operation.rb | 2 +- .../elastic_graph/spec_support/factories.rb | 4 +- .../spec_support/uses_datastore.rb | 2 +- 35 files changed, 372 insertions(+), 153 deletions(-) create mode 100644 elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/indexing_event_decoder.rb create mode 100644 elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/indexing_event_decoder.rbs create mode 100644 elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/indexing_event_decoder_spec.rb diff --git a/elasticgraph-indexer/README.md b/elasticgraph-indexer/README.md index 52a3be8c9..5ac3060ef 100644 --- a/elasticgraph-indexer/README.md +++ b/elasticgraph-indexer/README.md @@ -74,3 +74,7 @@ def decode(payload) # return an array of ElasticGraph indexing event hashes end ``` + +Decoded event hashes do not need to provide a schema version. When a version is omitted, the latest +available schema artifact version is used for validation and record preparation. Decoders may include +`schema_version` to request a specific schema artifact version. diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb index 2fe8b9162..0b3ef42f0 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/indexing_event_decoder.rb @@ -24,7 +24,8 @@ def initialize(config:, schema_artifacts:, logger:) end # @param payload [String] a raw payload from the transport - # @return [Array>] the decoded ElasticGraph indexing events + # @return [Array>] the decoded ElasticGraph indexing events. Events do not + # need to include a schema version; when omitted, the latest available schema version is used. def decode(payload) # :nocov: -- must return an array to satisfy Steep type checking but never called [] diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/factory.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/factory.rb index c6b5365de..02ed79eba 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/operation/factory.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/operation/factory.rb @@ -28,37 +28,30 @@ class Factory < Support::MemoizableData.define( def build(event) event = prepare_event(event) - selected_json_schema_version = select_json_schema_version(event) { |failure| return failure } + requested_schema_version = schema_version_from(event) + selected_schema_version = select_schema_version(event, requested_schema_version) { |failure| return failure } + event = event.merge(SCHEMA_VERSION_KEY => requested_schema_version) - # Because the `select_json_schema_version` picks the closest-matching json schema version, the incoming - # event might not match the expected json_schema_version value in the json schema (which is a `const` field). - # This is by design, since we're picking a schema based on best-effort, so to avoid that by-design validation error, - # performing the envelope validation on a "patched" version of the event. - event_with_patched_envelope = event.merge({JSON_SCHEMA_VERSION_KEY => selected_json_schema_version}) + event_for_validation = schema_artifacts.event_for_schema_version_validation(event, selected_schema_version) - if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_json_schema_version).validate_with_error_message(event_with_patched_envelope)) + if (error_message = validator(EVENT_ENVELOPE_JSON_SCHEMA_NAME, selected_schema_version).validate_with_error_message(event_for_validation)) return build_failed_result(event, "event payload", error_message) end - failed_result = validate_record_returning_failure(event, selected_json_schema_version) + failed_result = validate_record_returning_failure(event, selected_schema_version) failed_result || BuildResult.success(build_all_operations_for( event, - record_preparer_factory.for_json_schema_version(selected_json_schema_version) + record_preparer_factory.for_schema_version(selected_schema_version) )) end private - def select_json_schema_version(event) - available_json_schema_versions = schema_artifacts.available_json_schema_versions + def select_schema_version(event, requested_schema_version) + available_schema_versions = schema_artifacts.available_schema_versions - requested_json_schema_version = event[JSON_SCHEMA_VERSION_KEY] - - # First check that a valid value has been requested (a positive integer) - if !event.key?(JSON_SCHEMA_VERSION_KEY) - yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "Event lacks a `#{JSON_SCHEMA_VERSION_KEY}`") - elsif !requested_json_schema_version.is_a?(Integer) || requested_json_schema_version < 1 - yield build_failed_result(event, JSON_SCHEMA_VERSION_KEY, "#{JSON_SCHEMA_VERSION_KEY} (#{requested_json_schema_version}) must be a positive integer.") + unless requested_schema_version.is_a?(Integer) && requested_schema_version >= 1 + yield build_failed_result(event, SCHEMA_VERSION_KEY, "#{SCHEMA_VERSION_KEY} (#{requested_schema_version}) must be a positive integer.") end # The requested version might not necessarily be available (if the publisher is deployed ahead of the indexer, or an old schema @@ -67,46 +60,46 @@ def select_json_schema_version(event) # the event can still be indexed. # # This min_by block will take the closest version in the list. If a tie occurs, the first value in the list wins. The desired - # behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available json schema versions), the higher version + # behavior is in the event of a tie (highly unlikely, there shouldn't be a gap in available schema versions), the higher version # should be selected. So to get that behavior, the list is sorted in descending order. # - selected_json_schema_version = available_json_schema_versions.sort.reverse.min_by { |version| (requested_json_schema_version - version).abs } + selected_schema_version = available_schema_versions.sort.reverse.min_by { |version| (requested_schema_version - version).abs } - if selected_json_schema_version != requested_json_schema_version + if selected_schema_version != requested_schema_version logger.info({ - "message_type" => "ElasticGraphMissingJSONSchemaVersion", + "message_type" => "ElasticGraphMissingSchemaVersion", "message_id" => event["message_id"], "event_id" => EventID.from_event(event), "event_type" => event["type"], - "requested_json_schema_version" => requested_json_schema_version, - "selected_json_schema_version" => selected_json_schema_version + "requested_schema_version" => requested_schema_version, + "selected_schema_version" => selected_schema_version }) end - if selected_json_schema_version.nil? + if selected_schema_version.nil? yield build_failed_result( - event, JSON_SCHEMA_VERSION_KEY, - "Failed to select json schema version. Requested version: #{event[JSON_SCHEMA_VERSION_KEY]}. \ - Available json schema versions: #{available_json_schema_versions.sort.join(", ")}" + event, SCHEMA_VERSION_KEY, + "Failed to select schema version. Requested version: #{requested_schema_version}. \ + Available schema versions: #{available_schema_versions.sort.join(", ")}" ) end - selected_json_schema_version + selected_schema_version end - def validator(type, selected_json_schema_version) - factory = validator_factories_by_version[selected_json_schema_version] # : Support::JSONSchema::ValidatorFactory + def validator(type, selected_schema_version) + factory = validator_factories_by_version[selected_schema_version] # : Support::JSONSchema::ValidatorFactory factory.validator_for(type) end def validator_factories_by_version - @validator_factories_by_version ||= ::Hash.new do |hash, json_schema_version| + @validator_factories_by_version ||= ::Hash.new do |hash, schema_version| factory = Support::JSONSchema::ValidatorFactory.new( - schema: schema_artifacts.json_schemas_for(json_schema_version), + schema: schema_artifacts.json_schemas_for(schema_version), sanitize_pii: true ) factory = configure_record_validator.call(factory) if configure_record_validator - hash[json_schema_version] = factory + hash[schema_version] = factory end end @@ -117,10 +110,14 @@ def prepare_event(event) event.merge("record" => event["record"].merge("id" => event.fetch("id"))) end - def validate_record_returning_failure(event, selected_json_schema_version) + def schema_version_from(event) + event.fetch(SCHEMA_VERSION_KEY) { schema_artifacts.available_schema_versions.max } + end + + def validate_record_returning_failure(event, selected_schema_version) record = event.fetch("record") graphql_type_name = event.fetch("type") - validator = validator(graphql_type_name, selected_json_schema_version) + validator = validator(graphql_type_name, selected_schema_version) if (error_message = validator.validate_with_error_message(record)) build_failed_result(event, "#{graphql_type_name} record", error_message) @@ -130,7 +127,7 @@ def validate_record_returning_failure(event, selected_json_schema_version) def build_failed_result(event, payload_description, validation_message) message = "Malformed #{payload_description}. #{validation_message}" - # Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid JSON schema + # Here we use the `RecordPreparer::Identity` record preparer because we may not have a valid schema # version number in this case (which is usually required to get a `RecordPreparer` from the factory), and # we won't wind up using the record preparer for real on these operations, anyway. operations = build_all_operations_for(event, RecordPreparer::Identity) diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/processor.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/processor.rb index 6656470c2..1dd23a929 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/processor.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/processor.rb @@ -125,7 +125,7 @@ def calculate_latency_metrics(successful_operations, noop_results) "message_id" => event["message_id"], "event_type" => event.fetch("type"), "event_id" => EventID.from_event(event).to_s, - JSON_SCHEMA_VERSION_KEY => event.fetch(JSON_SCHEMA_VERSION_KEY), + SCHEMA_VERSION_KEY => event.fetch(SCHEMA_VERSION_KEY), "latencies_in_ms_from" => latencies_in_ms_from, "slo_results" => slo_results, "result" => result diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/record_preparer.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/record_preparer.rb index 6e0b04e44..80f5c703a 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/record_preparer.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/record_preparer.rb @@ -11,7 +11,7 @@ module ElasticGraph class Indexer class RecordPreparer - # Provides the ability to get a `RecordPreparer` for a specific JSON schema version. + # Provides the ability to get a `RecordPreparer` for a specific schema artifact version. class Factory def initialize(schema_artifacts) @schema_artifacts = schema_artifacts @@ -21,7 +21,7 @@ def initialize(schema_artifacts) hash[type_name] = scalar_types_by_name[type_name]&.load_indexing_preparer&.extension_class end # : ::Hash[::String, SchemaArtifacts::RuntimeMetadata::extensionClass?] - @preparers_by_json_schema_version = ::Hash.new do |hash, version| + @preparers_by_schema_version = ::Hash.new do |hash, version| hash[version] = RecordPreparer.new( indexing_preparer_by_scalar_type_name, build_type_metas_from(@schema_artifacts.json_schemas_for(version)) @@ -29,15 +29,15 @@ def initialize(schema_artifacts) end end - # Gets the `RecordPreparer` for the given JSON schema version. - def for_json_schema_version(json_schema_version) - @preparers_by_json_schema_version[json_schema_version] # : RecordPreparer + # Gets the `RecordPreparer` for the given schema artifact version. + def for_schema_version(schema_version) + @preparers_by_schema_version[schema_version] # : RecordPreparer end - # Gets the `RecordPreparer` for the latest JSON schema version. Intended primarily + # Gets the `RecordPreparer` for the latest schema artifact version. Intended primarily # for use in tests for convenience. - def for_latest_json_schema_version - for_json_schema_version(@schema_artifacts.latest_json_schema_version) + def for_latest_schema_version + for_schema_version(@schema_artifacts.latest_schema_version) end private diff --git a/elasticgraph-indexer/lib/elastic_graph/indexer/test_support/converters.rb b/elasticgraph-indexer/lib/elastic_graph/indexer/test_support/converters.rb index 31de51c26..93db74594 100644 --- a/elasticgraph-indexer/lib/elastic_graph/indexer/test_support/converters.rb +++ b/elasticgraph-indexer/lib/elastic_graph/indexer/test_support/converters.rb @@ -21,8 +21,8 @@ def self.upsert_event_for(record) "id" => record.fetch("id"), "type" => record.fetch("__typename"), "version" => record.fetch("__version"), - "record" => record.except("__typename", "__version", "__json_schema_version"), - JSON_SCHEMA_VERSION_KEY => record.fetch("__json_schema_version") + "record" => record.except("__typename", "__version", "__schema_version"), + SCHEMA_VERSION_KEY => record.fetch("__schema_version") } end diff --git a/elasticgraph-indexer/sig/elastic_graph/indexer/operation/factory.rbs b/elasticgraph-indexer/sig/elastic_graph/indexer/operation/factory.rbs index 82e968053..91cefadef 100644 --- a/elasticgraph-indexer/sig/elastic_graph/indexer/operation/factory.rbs +++ b/elasticgraph-indexer/sig/elastic_graph/indexer/operation/factory.rbs @@ -42,8 +42,9 @@ module ElasticGraph @validator_factories_by_version: ::Hash[::Integer, Support::JSONSchema::ValidatorFactory]? def validator_factories_by_version: () -> ::Hash[::Integer, Support::JSONSchema::ValidatorFactory] - def select_json_schema_version: (event) { (BuildResult) -> bot } -> (::Integer | bot) + def select_schema_version: (event, untyped) { (BuildResult) -> bot } -> (::Integer | bot) def prepare_event: (event) -> event + def schema_version_from: (event) -> untyped def validate_record_returning_failure: (event, ::Integer) -> BuildResult? def build_failed_result: (event, ::String, ::String) -> BuildResult def build_all_operations_for: (event, _RecordPreparer) -> ::Array[_Operation] diff --git a/elasticgraph-indexer/sig/elastic_graph/indexer/record_preparer.rbs b/elasticgraph-indexer/sig/elastic_graph/indexer/record_preparer.rbs index 1c4efcefc..e150359d7 100644 --- a/elasticgraph-indexer/sig/elastic_graph/indexer/record_preparer.rbs +++ b/elasticgraph-indexer/sig/elastic_graph/indexer/record_preparer.rbs @@ -11,13 +11,13 @@ module ElasticGraph class Factory def initialize: (schemaArtifacts) -> void - def for_json_schema_version: (::Integer) -> RecordPreparer - def for_latest_json_schema_version: () -> RecordPreparer + def for_schema_version: (::Integer) -> RecordPreparer + def for_latest_schema_version: () -> RecordPreparer private @schema_artifacts: schemaArtifacts - @preparers_by_json_schema_version: ::Hash[::Integer, RecordPreparer] + @preparers_by_schema_version: ::Hash[::Integer, RecordPreparer] def build_type_metas_from: (::Hash[::String, untyped]) -> ::Array[TypeMetadata] end diff --git a/elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb b/elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb index 314dc4eea..559cca462 100644 --- a/elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb +++ b/elasticgraph-indexer/spec/acceptance/schema_evolution_spec.rb @@ -96,7 +96,7 @@ def build_address_event_without_geolocation end def build_widget(json_schema_version:) - event = build_upsert_event(:widget, __json_schema_version: json_schema_version) + event = build_upsert_event(:widget, __schema_version: json_schema_version) event.merge("record" => (yield event.fetch("record"))) end end @@ -116,7 +116,7 @@ def build_widget(json_schema_version:) write_address_schema_def(json_schema_version: 2, address_extras: "t.deleted_field 'deprecated'") dump_artifacts - event = build_upsert_event(:address, id: "abc", deprecated: "foo", __json_schema_version: 1) + event = build_upsert_event(:address, id: "abc", deprecated: "foo", __schema_version: 1) expect(event.dig("record", "deprecated")).to eq("foo") boot_indexer.processor.process([event], refresh_indices: true) @@ -162,8 +162,8 @@ def get_address_payload(id) # included at that part of the JSON schema. So here we verify that the factory includes that. expect(build(:team_season)).to include(__typename: "TeamSeason") - v1_event = build_upsert_event(:team, __json_schema_version: 1) - v2_event = build_upsert_event(:team, __json_schema_version: 2) + v1_event = build_upsert_event(:team, __schema_version: 1) + v2_event = build_upsert_event(:team, __schema_version: 2) .then { |event| ::JSON.generate(event) } # Fix the event to align with the v2 schema, since `build_upsert_event` doesn't automatically # know that the `__typename` should be `SeasonOfATeam` instead of `TeamSeason`. @@ -200,8 +200,8 @@ def get_address_payload(id) end dump_artifacts - v1_event = build_upsert_event(:team, __json_schema_version: 1) - v2_event = build_upsert_event(:team, __json_schema_version: 2) + v1_event = build_upsert_event(:team, __schema_version: 1) + v2_event = build_upsert_event(:team, __schema_version: 2) expect { boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true) @@ -244,9 +244,9 @@ def get_address_payload(id) end dump_artifacts - v1_event = build_upsert_event(:team, __json_schema_version: 1) + v1_event = build_upsert_event(:team, __schema_version: 1) v1_event = ::JSON.parse(::JSON.generate(v1_event).gsub('"name":', '"full_name":')) - v2_event = build_upsert_event(:team, __json_schema_version: 2) + v2_event = build_upsert_event(:team, __schema_version: 2) expect { boot_indexer.processor.process([v1_event, v2_event], refresh_indices: true) @@ -288,7 +288,7 @@ def get_address_payload(id) end dump_artifacts - v1_event = build_upsert_event(:team, __json_schema_version: 1) + v1_event = build_upsert_event(:team, __schema_version: 1) expect { boot_indexer.processor.process([v1_event], refresh_indices: true) @@ -323,7 +323,7 @@ def get_address_payload(id) write_address_schema_def(json_schema_version: 2, schema_extras: 'schema.deleted_type "Team"') dump_artifacts - v1_event = build_upsert_event(:team, __json_schema_version: 1) + v1_event = build_upsert_event(:team, __schema_version: 1) boot_indexer.processor.process([v1_event], refresh_indices: true) expect(search_for_ids("teams")).to be_empty diff --git a/elasticgraph-indexer/spec/support/indexing_preparer.rb b/elasticgraph-indexer/spec/support/indexing_preparer.rb index df85ca869..3d43eae0e 100644 --- a/elasticgraph-indexer/spec/support/indexing_preparer.rb +++ b/elasticgraph-indexer/spec/support/indexing_preparer.rb @@ -29,7 +29,7 @@ schema.object_type "Object" do |t| t.field "scalar", scalar_type end - end).record_preparer_factory.for_latest_json_schema_version + end).record_preparer_factory.for_latest_schema_version end def prepare_scalar_value(value) diff --git a/elasticgraph-indexer/spec/support/multiple_version_support.rb b/elasticgraph-indexer/spec/support/multiple_version_support.rb index e3b044aab..c9925459b 100644 --- a/elasticgraph-indexer/spec/support/multiple_version_support.rb +++ b/elasticgraph-indexer/spec/support/multiple_version_support.rb @@ -27,6 +27,8 @@ def build_indexer_with_multiple_schema_versions(schema_versions:) allow(artifacts).to receive(:available_json_schema_versions).and_return(json_schemas_by_version.keys.to_set) allow(artifacts).to receive(:latest_json_schema_version).and_return(json_schemas_by_version.keys.max) + allow(artifacts).to receive(:available_schema_versions).and_return(json_schemas_by_version.keys.to_set) + allow(artifacts).to receive(:latest_schema_version).and_return(json_schemas_by_version.keys.max) allow(artifacts).to receive(:json_schemas_for) do |version| json_schemas_by_version.fetch(version) end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb index 7702a195a..0629c5a38 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/datastore_indexing_router_spec.rb @@ -601,7 +601,7 @@ def new_operation(event, update_target: nil, **overrides) arguments = { event: event, - prepared_record: indexer.record_preparer_factory.for_latest_json_schema_version.prepare_for_index( + prepared_record: indexer.record_preparer_factory.for_latest_schema_version.prepare_for_index( event.fetch("type"), event.fetch("record"), destination_index_mapping.fetch("properties") diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb index d420a40a3..332ca9e6a 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/count_accumulator_spec.rb @@ -300,7 +300,7 @@ def script_params_for(data:, source_type:, destination_type:, indexer: build_ind update = Update.new( event: {"type" => source_type, "record" => data}, destination_index_def: destination_index_def, - prepared_record: indexer.record_preparer_factory.for_latest_json_schema_version.prepare_for_index( + prepared_record: indexer.record_preparer_factory.for_latest_schema_version.prepare_for_index( source_type, data, destination_index_mapping.fetch("properties") diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/factory_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/factory_spec.rb index 1fed90d64..f25e55b67 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/factory_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/factory_spec.rb @@ -36,7 +36,7 @@ module Operation "type" => "Widget", "version" => 1, "record" => event["record"], - JSON_SCHEMA_VERSION_KEY => 1 + SCHEMA_VERSION_KEY => 1 } expect(build_expecting_success(event)).to contain_exactly( @@ -85,7 +85,7 @@ module Operation "type" => "Component", "version" => 1, "record" => event["record"], - JSON_SCHEMA_VERSION_KEY => 1 + SCHEMA_VERSION_KEY => 1 }.merge(latency_timestamps))]) end @@ -132,7 +132,7 @@ module Operation "id" => "1", "type" => "MyOwnInvalidGraphQlType", "version" => 1, - JSON_SCHEMA_VERSION_KEY => 1, + SCHEMA_VERSION_KEY => 1, "record" => {"field1" => "value1", "field2" => "value2", "id" => "1"} } @@ -146,7 +146,7 @@ module Operation "id" => "1", "type" => "WidgetOptions", "version" => 1, - JSON_SCHEMA_VERSION_KEY => 1, + SCHEMA_VERSION_KEY => 1, "record" => {"field1" => "value1", "field2" => "value2", "id" => "1"} } @@ -193,17 +193,29 @@ module Operation expect_failed_event_error(event, "missing_keys", "version") end - it "notifies an error on missing `#{JSON_SCHEMA_VERSION_KEY}`" do - event = build_upsert_event(:component).except(JSON_SCHEMA_VERSION_KEY) + it "defaults to the latest schema version when no schema version is specified" do + event = build_upsert_event(:component).except(SCHEMA_VERSION_KEY) - expect_failed_event_error(event, JSON_SCHEMA_VERSION_KEY) + expect(build_expecting_success(event)).to contain_exactly(new_primary_indexing_operation( + event.merge(SCHEMA_VERSION_KEY => 1) + )) + end + + it "accepts schema_version to select an artifact version" do + event = build_upsert_event(:component, id: "1", __version: 1) + .except(SCHEMA_VERSION_KEY) + .merge(SCHEMA_VERSION_KEY => 1) + + expect(build_expecting_success(event)).to eq([new_primary_indexing_operation( + event.except(SCHEMA_VERSION_KEY).merge(SCHEMA_VERSION_KEY => 1) + )]) end it "notifies an error on wrong field types" do event = { "op" => "upsert", "id" => 1, - JSON_SCHEMA_VERSION_KEY => 1, + SCHEMA_VERSION_KEY => 1, "type" => [], "version" => "1", "record" => "" @@ -249,7 +261,7 @@ module Operation context "when the indexer has json schemas v2 and v4 (v4 adds yellow color)" do before do # With the "real" version one as a baseline, create a separate version with a small schema change. - # Tests will then specify the desired json_schema_version in the event payload to test the schema-choosing + # Tests will then specify the desired schema version in the event payload to test the schema-choosing # behavior of the `factory` class. schemas = { 2 => indexer.schema_artifacts.json_schemas_for(1), @@ -258,8 +270,8 @@ module Operation end } - allow(indexer.schema_artifacts).to receive(:available_json_schema_versions).and_return(schemas.keys.to_set) - allow(indexer.schema_artifacts).to receive(:latest_json_schema_version).and_return(schemas.keys.max) + allow(indexer.schema_artifacts).to receive(:available_schema_versions).and_return(schemas.keys.to_set) + allow(indexer.schema_artifacts).to receive(:latest_schema_version).and_return(schemas.keys.max) allow(indexer.schema_artifacts).to receive(:json_schemas_for) do |version| ::Marshal.load(::Marshal.dump(schemas.fetch(version))).tap do |schema| schema[JSON_SCHEMA_VERSION_KEY] = version @@ -269,15 +281,15 @@ module Operation end it "validates against an older version of a json schema if specified" do - # YELLOW doesn't exist in schema version 2. So expect an error when json_schema_version is set to 2. - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: 2) + # YELLOW doesn't exist in schema version 2. So expect an error when schema_version is set to 2. + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: 2) event["record"]["options"]["color"] = "YELLOW" expect_failed_event_error(event, "/options/color") end it "validates against the latest version of a json schema if specified" do - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: 4) + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: 4) event["record"]["options"]["color"] = "YELLOW" expect(build_expecting_success(event)).to include(new_primary_indexing_operation({ @@ -286,13 +298,13 @@ module Operation "type" => "Widget", "version" => 1, "record" => event["record"], - JSON_SCHEMA_VERSION_KEY => 4 + SCHEMA_VERSION_KEY => 4 }, index_def: index_def_named("widgets"))) end it "validates against the closest version if the requested version is newer than what's available" do # 5 is closest to "4", validation should match behavior from version "4" - YELLOW should pass validation. - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: 5) + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: 5) event["record"]["options"]["color"] = "YELLOW" expect(build_expecting_success(event)).to include(new_primary_indexing_operation({ @@ -301,20 +313,20 @@ module Operation "type" => "Widget", "version" => 1, "record" => event["record"], - JSON_SCHEMA_VERSION_KEY => 5 # Originally-specified version. + SCHEMA_VERSION_KEY => 5 # Originally-specified version. }, index_def: index_def_named("widgets"))) - expect(logged_jsons_of_type("ElasticGraphMissingJSONSchemaVersion").last).to include( + expect(logged_jsons_of_type("ElasticGraphMissingSchemaVersion").last).to include( "event_id" => "Widget:1@v1", "event_type" => "Widget", - "requested_json_schema_version" => 5, - "selected_json_schema_version" => 4 + "requested_schema_version" => 5, + "selected_schema_version" => 4 ) end it "validates against the closest version if the requested version older than what's available" do # 1 is closest to "2", validation should match behavior from version "2" - YELLOW should fail validation. - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: 1).merge("message_id" => "m123") + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: 1).merge("message_id" => "m123") event["record"]["options"]["color"] = "YELLOW" # Should fail, but should still log the version mismatch as well. @@ -325,17 +337,17 @@ module Operation "/options/color" ) - expect(logged_jsons_of_type("ElasticGraphMissingJSONSchemaVersion").last).to include( + expect(logged_jsons_of_type("ElasticGraphMissingSchemaVersion").last).to include( "event_id" => "Widget:1@v1", "message_id" => "m123", "event_type" => "Widget", - "requested_json_schema_version" => 1, - "selected_json_schema_version" => 2 + "requested_schema_version" => 1, + "selected_schema_version" => 2 ) end it "validates against a version newer than what's requested, if the requested version is equidistant from two available versions" do - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: 3) + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: 3) event["record"]["options"]["color"] = "YELLOW" expect(build_expecting_success(event)).to include(new_primary_indexing_operation({ @@ -344,30 +356,31 @@ module Operation "type" => "Widget", "version" => 1, "record" => event["record"], - JSON_SCHEMA_VERSION_KEY => 3 # Originally-specified version. + SCHEMA_VERSION_KEY => 3 # Originally-specified version. }, index_def: index_def_named("widgets"))) - expect(logged_jsons_of_type("ElasticGraphMissingJSONSchemaVersion").last).to include( + expect(logged_jsons_of_type("ElasticGraphMissingSchemaVersion").last).to include( "event_id" => "Widget:1@v1", "event_type" => "Widget", - "requested_json_schema_version" => 3, - "selected_json_schema_version" => 4 + "requested_schema_version" => 3, + "selected_schema_version" => 4 ) end - it "notifies an error if an invalid (e.g. negative) json_schema_version is specified" do - event = build_upsert_event(:widget, id: "1", __version: 1, __json_schema_version: -1) + it "notifies an error if an invalid (e.g. negative) schema version is specified" do + event = build_upsert_event(:widget, id: "1", __version: 1, __schema_version: -1) expect_failed_event_error(event, "must be a positive integer", "(-1)") end - it "notifies an error if it's unable to select a json_schema_version" do + it "notifies an error if it's unable to select a schema version" do event = build_upsert_event(:component, id: "1", __version: 1) event["record"]["name"] = 123 fake_empty_schema_artifacts = instance_double( "ElasticGraph::SchemaArtifacts::FromDisk", - available_json_schema_versions: Set[], + available_schema_versions: Set[], + event_for_schema_version_validation: nil, runtime_metadata: indexer.schema_artifacts.runtime_metadata, indices: indexer.schema_artifacts.indices, index_templates: indexer.schema_artifacts.index_templates, @@ -376,7 +389,7 @@ module Operation operation_factory = build_indexer(schema_artifacts: fake_empty_schema_artifacts).operation_factory - expect_failed_event_error(event, "Failed to select json schema version", factory: operation_factory) + expect_failed_event_error(event, "Failed to select schema version", factory: operation_factory) end end @@ -445,7 +458,7 @@ def widget_currency_derived_update_operation_for(event) operations = Update.operations_for( event: event, destination_index_def: index_def_named("widget_currencies"), - record_preparer: indexer.record_preparer_factory.for_latest_json_schema_version, + record_preparer: indexer.record_preparer_factory.for_latest_schema_version, update_target: indexer.schema_artifacts.runtime_metadata.object_types_by_name.fetch("Widget").update_targets.first, destination_index_mapping: indexer.schema_artifacts.index_mappings_by_index_def_name.fetch("widget_currencies") ) diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb index 686611ceb..47bdd0301 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/operation/update_spec.rb @@ -460,7 +460,7 @@ def operations_for_indexer(indexer, event: self.event, source_type: "Widget", de Update.operations_for( event: event, destination_index_def: index_defs_by_name.fetch(destination_index), - record_preparer: indexer.record_preparer_factory.for_latest_json_schema_version, + record_preparer: indexer.record_preparer_factory.for_latest_schema_version, update_target: update_target, destination_index_mapping: indexer.schema_artifacts.index_mappings_by_index_def_name.fetch(destination_index) ) diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/record_preparer_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/record_preparer_spec.rb index c179e10eb..4c1d5ec84 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/record_preparer_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/record_preparer_spec.rb @@ -35,21 +35,21 @@ class Indexer }).record_preparer_factory end - describe "#for_json_schema_version" do + describe "#for_schema_version" do it "memoizes `RecordPreparer` since they are immutable and that saves on memory" do - for_v1 = factory_with_multiple_versions.for_json_schema_version(1) - for_v2 = factory_with_multiple_versions.for_json_schema_version(2) + for_v1 = factory_with_multiple_versions.for_schema_version(1) + for_v2 = factory_with_multiple_versions.for_schema_version(2) expect(for_v1).not_to eq(for_v2) - expect(factory_with_multiple_versions.for_json_schema_version(1)).to be for_v1 + expect(factory_with_multiple_versions.for_schema_version(1)).to be for_v1 end end - describe "#for_latest_json_schema_version" do - it "returns the record preparer for the latest JSON schema version" do - for_v2 = factory_with_multiple_versions.for_json_schema_version(2) + describe "#for_latest_schema_version" do + it "returns the record preparer for the latest schema version" do + for_v2 = factory_with_multiple_versions.for_schema_version(2) - expect(factory_with_multiple_versions.for_latest_json_schema_version).to be for_v2 + expect(factory_with_multiple_versions.for_latest_schema_version).to be for_v2 end end end @@ -572,7 +572,7 @@ def build_preparer_for_old_json_schema_version(v1_def:, v2_def:) allow(v2_results).to receive(:json_schemas_for).with(1).and_return(v1_merge_result.json_schema) - RecordPreparer::Factory.new(v2_results).for_json_schema_version(1) + RecordPreparer::Factory.new(v2_results).for_schema_version(1) end def define_schema(&schema_definition) @@ -587,12 +587,12 @@ def define_schema(&schema_definition) def build_preparer(**config_overrides, &schema_definition) build_indexer(schema_definition: schema_definition, **config_overrides) .record_preparer_factory - .for_latest_json_schema_version + .for_latest_schema_version end def build_preparer_with_artifacts(**config_overrides, &schema_definition) indexer = build_indexer(schema_definition: schema_definition, **config_overrides) - preparer = indexer.record_preparer_factory.for_latest_json_schema_version + preparer = indexer.record_preparer_factory.for_latest_schema_version [preparer, indexer.schema_artifacts] end end diff --git a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/test_support/converters_spec.rb b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/test_support/converters_spec.rb index bfed1c8f7..615d6cdf5 100644 --- a/elasticgraph-indexer/spec/unit/elastic_graph/indexer/test_support/converters_spec.rb +++ b/elasticgraph-indexer/spec/unit/elastic_graph/indexer/test_support/converters_spec.rb @@ -19,7 +19,7 @@ module TestSupport "id" => "1", "__version" => 1, "__typename" => "Widget", - "__json_schema_version" => 1, + "__schema_version" => 1, "field1" => "value1", "field2" => "value2" } @@ -30,7 +30,7 @@ module TestSupport "version" => 1, "type" => "Widget", "record" => {"id" => "1", "field1" => "value1", "field2" => "value2"}, - JSON_SCHEMA_VERSION_KEY => 1 + SCHEMA_VERSION_KEY => 1 ) end end @@ -41,7 +41,7 @@ module TestSupport "id" => "1", "__typename" => "Widget", "__version" => 1, - "__json_schema_version" => 1, + "__schema_version" => 1, "field1" => "value1", "field2" => "value2" } @@ -50,7 +50,7 @@ module TestSupport "id" => "2", "__typename" => "Address", "__version" => 5, - "__json_schema_version" => 1, + "__schema_version" => 1, "field3" => "value5" } @@ -63,7 +63,7 @@ module TestSupport "version" => 1, "type" => "Widget", "record" => {"id" => "1", "field1" => "value1", "field2" => "value2"}, - JSON_SCHEMA_VERSION_KEY => 1 + SCHEMA_VERSION_KEY => 1 }, { "op" => "upsert", @@ -71,7 +71,7 @@ module TestSupport "version" => 5, "type" => "Address", "record" => {"id" => "2", "field3" => "value5"}, - JSON_SCHEMA_VERSION_KEY => 1 + SCHEMA_VERSION_KEY => 1 } ]) end diff --git a/elasticgraph-json_ingestion/README.md b/elasticgraph-json_ingestion/README.md index 4b32d9cf1..6525c494a 100644 --- a/elasticgraph-json_ingestion/README.md +++ b/elasticgraph-json_ingestion/README.md @@ -7,6 +7,19 @@ events and validates JSON-ingestion-specific schema options. Generated ElasticGr and enable it by default. Applications that wire schema-definition tasks manually enable it by adding `ElasticGraph::JSONIngestion::SchemaDefinition::APIExtension` to their schema-definition extension modules. +## Indexing Event Decoder + +JSON ingestion payloads can include `json_schema_version` to request a specific JSON schema artifact +version. Configure the JSON ingestion decoder when the indexer consumes those payloads so the JSON-specific +field is mapped to the indexer's generic `schema_version` event field. + +```yaml +indexer: + indexing_event_decoder: + name: ElasticGraph::JSONIngestion::IndexingEventDecoder::JSONLines + require_path: elastic_graph/json_ingestion/indexing_event_decoder +``` + ## Schema Definition APIs Use `schema.json_schema_version` to identify the current JSON schema artifact. Every change that affects diff --git a/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/indexing_event_decoder.rb b/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/indexing_event_decoder.rb new file mode 100644 index 000000000..8271dbd56 --- /dev/null +++ b/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/indexing_event_decoder.rb @@ -0,0 +1,43 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/constants" +require "json" + +module ElasticGraph + module JSONIngestion + module IndexingEventDecoder + # Indexing event decoder for JSON ingestion payloads represented as newline-delimited JSON objects. + class JSONLines + # @param config [Hash] configuration from the `indexing_event_decoder.config` setting + # @param schema_artifacts [SchemaArtifacts::FromDisk] the schema artifacts + # @param logger [Logger] the ElasticGraph logger + def initialize(config:, schema_artifacts:, logger:) + # must be defined for extension interface verification, but nothing to do + end + + # @param payload [String] a raw JSON Lines payload + # @return [Array>] the decoded ElasticGraph indexing events + def decode(payload) + payload.split("\n").map do |event_json| + event = ::JSON.parse(event_json) + adapt_json_schema_version(event) + end + end + + private + + def adapt_json_schema_version(event) + return event unless event.key?(JSON_SCHEMA_VERSION_KEY) + + event.except(JSON_SCHEMA_VERSION_KEY).merge(SCHEMA_VERSION_KEY => event.fetch(JSON_SCHEMA_VERSION_KEY)) + end + end + end + end +end diff --git a/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/schema_definition/results_extension.rb b/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/schema_definition/results_extension.rb index 03df3504b..f577b700e 100644 --- a/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/schema_definition/results_extension.rb +++ b/elasticgraph-json_ingestion/lib/elastic_graph/json_ingestion/schema_definition/results_extension.rb @@ -35,11 +35,28 @@ def available_json_schema_versions @available_json_schema_versions ||= Set[latest_json_schema_version] end + # @return [Set] set of available schema versions + def available_schema_versions + available_json_schema_versions + end + # @return [Integer] the current JSON schema version def latest_json_schema_version current_public_json_schema[JSON_SCHEMA_VERSION_KEY] end + # @return [Integer] the current schema version + def latest_schema_version + latest_json_schema_version + end + + # @param event [Hash] the ElasticGraph indexing event + # @param schema_version [Integer] the schema artifact version selected for validation + # @return [Hash] + def event_for_schema_version_validation(event, schema_version) + event.except(SCHEMA_VERSION_KEY).merge(JSON_SCHEMA_VERSION_KEY => schema_version) + end + # @private def json_schema_version_setter_location json_ingestion_state.json_schema_version_setter_location diff --git a/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/indexing_event_decoder.rbs b/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/indexing_event_decoder.rbs new file mode 100644 index 000000000..8c7a346d3 --- /dev/null +++ b/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/indexing_event_decoder.rbs @@ -0,0 +1,21 @@ +module ElasticGraph + module JSONIngestion + module IndexingEventDecoder + class JSONLines + def initialize: ( + config: ::Hash[::Symbol | ::String, untyped], + schema_artifacts: untyped, + logger: untyped + ) -> void + + def decode: (::String) -> ::Array[::Hash[::String, untyped]] + + private + + def adapt_json_schema_version: ( + ::Hash[::String, untyped] + ) -> ::Hash[::String, untyped] + end + end + end +end diff --git a/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/schema_definition/results_extension.rbs b/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/schema_definition/results_extension.rbs index 67fcdd87d..6ed557de6 100644 --- a/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/schema_definition/results_extension.rbs +++ b/elasticgraph-json_ingestion/sig/elastic_graph/json_ingestion/schema_definition/results_extension.rbs @@ -5,6 +5,12 @@ module ElasticGraph def json_schemas_for: (::Integer) -> ::Hash[::String, untyped] def available_json_schema_versions: () -> ::Set[::Integer] def latest_json_schema_version: () -> ::Integer + def available_schema_versions: () -> ::Set[::Integer] + def latest_schema_version: () -> ::Integer + def event_for_schema_version_validation: ( + ::Hash[::String, untyped], + ::Integer + ) -> ::Hash[::String, untyped] def json_schema_version_setter_location: () -> ::Thread::Backtrace::Location? def json_schema_field_metadata_by_type_and_field_name: () -> ::Hash[::String, ::Hash[::String, Indexing::JSONSchemaFieldMetadata]] def current_public_json_schema: () -> ::Hash[::String, untyped] diff --git a/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/indexing_event_decoder_spec.rb b/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/indexing_event_decoder_spec.rb new file mode 100644 index 000000000..35ef9d34b --- /dev/null +++ b/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/indexing_event_decoder_spec.rb @@ -0,0 +1,36 @@ +# Copyright 2024 - 2026 Block, Inc. +# +# Use of this source code is governed by an MIT-style +# license that can be found in the LICENSE file or at +# https://opensource.org/licenses/MIT. +# +# frozen_string_literal: true + +require "elastic_graph/json_ingestion/indexing_event_decoder" + +module ElasticGraph + module JSONIngestion + RSpec.describe IndexingEventDecoder::JSONLines do + it "decodes newline-delimited JSON objects" do + decoder = described_class.new(config: {}, schema_artifacts: nil, logger: nil) + payload = <<~JSONL + {"op":"upsert","id":"1"} + {"op":"upsert","id":"2"} + JSONL + + expect(decoder.decode(payload)).to eq([ + {"op" => "upsert", "id" => "1"}, + {"op" => "upsert", "id" => "2"} + ]) + end + + it "maps json_schema_version to the indexer's generic schema_version key" do + decoder = described_class.new(config: {}, schema_artifacts: nil, logger: nil) + + expect(decoder.decode('{"op":"upsert","id":"1","json_schema_version":3}')).to eq([ + {"op" => "upsert", "id" => "1", SCHEMA_VERSION_KEY => 3} + ]) + end + end + end +end diff --git a/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/schema_definition/json_schema_spec.rb b/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/schema_definition/json_schema_spec.rb index 00ca08d2b..e4a2b8c09 100644 --- a/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/schema_definition/json_schema_spec.rb +++ b/elasticgraph-json_ingestion/spec/unit/elastic_graph/json_ingestion/schema_definition/json_schema_spec.rb @@ -2967,11 +2967,17 @@ def link_supertype_to_subtypes(interface_type, *subtype_names) end it "sets json_schema_version to the specified (valid) value" do - result = define_schema(schema_element_name_form: "snake_case") do |s| + results = define_schema(schema_element_name_form: "snake_case") do |s| s.json_schema_version 1 - end.json_schemas_for(1) + end - expect(result[JSON_SCHEMA_VERSION_KEY]).to eq(1) + expect(results.json_schemas_for(1)[JSON_SCHEMA_VERSION_KEY]).to eq(1) + expect(results.available_schema_versions).to eq(Set[1]) + expect(results.latest_schema_version).to eq(1) + expect(results.event_for_schema_version_validation({"id" => "1", SCHEMA_VERSION_KEY => 2}, 1)).to eq( + "id" => "1", + JSON_SCHEMA_VERSION_KEY => 1 + ) end it "allows json_schema_version enforcement to be disabled" do diff --git a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/from_disk.rb b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/from_disk.rb index 742c80503..d16d24197 100644 --- a/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/from_disk.rb +++ b/elasticgraph-schema_artifacts/lib/elastic_graph/schema_artifacts/from_disk.rb @@ -92,6 +92,13 @@ def available_json_schema_versions end end + # Provides the set of available schema versions for indexing events. + # + # @return [Set] + def available_schema_versions + available_json_schema_versions + end + # Provides the latest JSON schema version. # # @return [Integer] @@ -110,6 +117,22 @@ def latest_json_schema_version ) end + # Provides the latest schema version for indexing events. + # + # @return [Integer] + def latest_schema_version + latest_json_schema_version + end + + # Adapts an indexing event for validating it against the schema artifact at the given version. + # + # @param event [Hash] the ElasticGraph indexing event + # @param schema_version [Integer] the schema artifact version selected for validation + # @return [Hash] + def event_for_schema_version_validation(event, schema_version) + event.except(SCHEMA_VERSION_KEY).merge(JSON_SCHEMA_VERSION_KEY => schema_version) + end + # Provides the datastore configuration. The datastore configuration defines the full configuration--including indices, templates, # and scripts--required in the datastore (Elasticsearch or OpenSearch) by ElasticGraph for the current schema. # diff --git a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts.rbs b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts.rbs index e63129b2e..e4e0a09f2 100644 --- a/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts.rbs +++ b/elasticgraph-schema_artifacts/sig/elastic_graph/schema_artifacts.rbs @@ -13,6 +13,12 @@ module ElasticGraph def json_schemas_for: (::Integer) -> ::Hash[::String, untyped] def available_json_schema_versions: () -> ::Set[::Integer] def latest_json_schema_version: () -> ::Integer + def available_schema_versions: () -> ::Set[::Integer] + def latest_schema_version: () -> ::Integer + def event_for_schema_version_validation: ( + ::Hash[::String, untyped], + ::Integer + ) -> ::Hash[::String, untyped] end module SchemaArtifacts diff --git a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/from_disk_spec.rb b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/from_disk_spec.rb index 6708554da..a26c212d8 100644 --- a/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/from_disk_spec.rb +++ b/elasticgraph-schema_artifacts/spec/unit/elastic_graph/schema_artifacts/from_disk_spec.rb @@ -40,6 +40,10 @@ module SchemaArtifacts expect(available_versions).not_to include(nil) # No `nil` values should be present. end + it "lists the available schema versions" do + expect(artifacts.available_schema_versions).to eq(artifacts.available_json_schema_versions) + end + it "raises if an unavailable json_schema version is requested" do expect { artifacts.json_schemas_for(9999) @@ -49,6 +53,19 @@ module SchemaArtifacts it "returns the largest JSON schema version as the `latest_json_schema_version`" do expect(artifacts.latest_json_schema_version).to eq 2 end + + it "returns the latest schema version" do + expect(artifacts.latest_schema_version).to eq 2 + end + + it "adapts events for schema version validation" do + event = {"id" => "1", SCHEMA_VERSION_KEY => 1} + + expect(artifacts.event_for_schema_version_validation(event, 2)).to eq( + "id" => "1", + JSON_SCHEMA_VERSION_KEY => 2 + ) + end end context "before any artifacts have been dumped", :in_temp_dir do @@ -65,10 +82,18 @@ module SchemaArtifacts expect(artifacts.available_json_schema_versions).to eq Set.new end + it "returns an empty set from `available_schema_versions`" do + expect(artifacts.available_schema_versions).to eq Set.new + end + it "raises an error from `latest_json_schema_version`" do expect { artifacts.latest_json_schema_version }.to raise_missing_artifacts_error end + it "raises an error from `latest_schema_version`" do + expect { artifacts.latest_schema_version }.to raise_missing_artifacts_error + end + def raise_missing_artifacts_error raise_error Errors::MissingSchemaArtifactError, a_string_including("could not be found", artifacts.artifacts_dir) end diff --git a/elasticgraph-support/lib/elastic_graph/constants.rb b/elasticgraph-support/lib/elastic_graph/constants.rb index bf6dcfbc0..5a9408e01 100644 --- a/elasticgraph-support/lib/elastic_graph/constants.rb +++ b/elasticgraph-support/lib/elastic_graph/constants.rb @@ -137,6 +137,10 @@ module ElasticGraph # @private JSON_SCHEMA_VERSION_KEY = "json_schema_version" + # Name for field in indexing events that identifies the schema artifact version to use. + # @private + SCHEMA_VERSION_KEY = "schema_version" + # String that goes in the middle of a rollover index name, used to mark it as a rollover # index (and split on to parse a rollover index name). # @private diff --git a/elasticgraph-support/sig/elastic_graph/constants.rbs b/elasticgraph-support/sig/elastic_graph/constants.rbs index 7d8e55585..a8fb7a1ed 100644 --- a/elasticgraph-support/sig/elastic_graph/constants.rbs +++ b/elasticgraph-support/sig/elastic_graph/constants.rbs @@ -22,6 +22,7 @@ module ElasticGraph ROLLOVER_INDEX_INFIX_MARKER: ::String JSON_SCHEMAS_BY_VERSION_DIRECTORY: ::String JSON_SCHEMA_VERSION_KEY: ::String + SCHEMA_VERSION_KEY: ::String DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE: ::String INDEX_DATA_UPDATE_SCRIPT_ID: ::String UPDATE_WAS_NOOP_MESSAGE_PREAMBLE: ::String diff --git a/elasticgraph-warehouse_lambda/README.md b/elasticgraph-warehouse_lambda/README.md index 62dd339a2..25539d86e 100644 --- a/elasticgraph-warehouse_lambda/README.md +++ b/elasticgraph-warehouse_lambda/README.md @@ -4,7 +4,7 @@ Write ElasticGraph-shaped JSONL files to S3, packaged for AWS Lambda. This gem adapts ElasticGraph's indexing pipeline so that, instead of writing to the datastore, it writes batched, gzipped [JSON Lines](https://jsonlines.org/) (JSONL) files to Amazon S3. Each line in the file -conforms to a specific JSON Schema version for the corresponding object type, with files partitioned by schema version. +conforms to a specific schema version for the corresponding object type, with files partitioned by schema version. **Note:** This code does not deduplicate when writing to S3, so the data will contain all events and versions published, plus any Lambda retries. Consumers of the S3 bucket are responsible for @@ -33,10 +33,10 @@ graph LR; ## What it does -- Consumes ElasticGraph indexing operations and groups them by GraphQL type and JSON schema version +- Consumes ElasticGraph indexing operations and groups them by GraphQL type and schema version - Transforms each operation into a flattened JSON document that matches your ElasticGraph schema -- Writes one gzipped JSONL file per type per JSON schema version per batch to S3 with deterministic keys: - - `s3://///v//.jsonl.gz` +- Writes one gzipped JSONL file per type per schema version per batch to S3 with deterministic keys: + - `s3://///v//.jsonl.gz` - Emits structured logs for observability (counts, sizes, S3 key, etc.) ## When to use it @@ -67,13 +67,13 @@ warehouse: Files are written with the following S3 key format: ``` -//v//.jsonl.gz +//v//.jsonl.gz ``` - **s3_path_prefix**: Configurable in YAML (warehouse.s3_path_prefix). This is the full prefix you control, so you can organize your data however you like (e.g., "dumped-data/Data001" or "prod/analytics/v2"). - **TypeName**: GraphQL type from the ElasticGraph event -- **json_schema_version**: The JSON Schema version **selected based on the ingested event's requested version** +- **schema_version**: The schema version **selected based on the ingested event's requested version** (or the closest available version if the exact version isn't available). This ensures data partitioning matches the actual schema version used to process each event, making it easier to handle schema evolution and version-specific data processing. diff --git a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb index bf1d8e8af..459258f11 100644 --- a/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb +++ b/elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb @@ -34,20 +34,20 @@ def initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:) end # Processes a batch of indexing operations by dumping them to S3 as gzipped JSONL files. - # Operations are grouped by GraphQL type and JSON schema version, with each group written to a separate file. + # Operations are grouped by GraphQL type and schema version, with each group written to a separate file. # # @param operations [Array] the indexing operations to process # @param refresh [Boolean] ignored (included for interface compatibility with DatastoreIndexingRouter) # @return [BulkResult] result containing success status for all operations def bulk(operations, refresh: false) - operations_by_type_and_json_schema_version = operations.group_by { |op| [op.event.fetch("type"), op.event.fetch(JSON_SCHEMA_VERSION_KEY)] } + operations_by_type_and_schema_version = operations.group_by { |op| [op.event.fetch("type"), op.event.fetch(SCHEMA_VERSION_KEY)] } @logger.info({ "message_type" => LOG_MSG_RECEIVED_BATCH, - "record_counts_by_type" => operations_by_type_and_json_schema_version.transform_keys { |(type, _json_schema_version)| type }.transform_values(&:size) + "record_counts_by_type" => operations_by_type_and_schema_version.transform_keys { |(type, _schema_version)| type }.transform_values(&:size) }) - operations_by_type_and_json_schema_version.each do |(type, json_schema_version), operations| + operations_by_type_and_schema_version.each do |(type, schema_version), operations| # Operations coming from the indexer are always Update operations for warehouse dumping update_operations = operations # : ::Array[::ElasticGraph::Indexer::Operation::Update] jsonl_data = build_jsonl_file_from(update_operations) @@ -56,7 +56,7 @@ def bulk(operations, refresh: false) next if jsonl_data.empty? gzip_data = compress(jsonl_data) - s3_key = generate_s3_key_for(type, json_schema_version) + s3_key = generate_s3_key_for(type, schema_version) # Use if_none_match: "*" to prevent overwrites (defense-in-depth, though UUIDs make collisions impossible) @s3_client.put_object( @@ -72,7 +72,7 @@ def bulk(operations, refresh: false) "s3_bucket" => @s3_bucket_name, "s3_key" => s3_key, "type" => type, - JSON_SCHEMA_VERSION_KEY => json_schema_version, + SCHEMA_VERSION_KEY => schema_version, "record_count" => operations.size, "json_size" => jsonl_data.bytesize, "gzip_size" => gzip_data.bytesize @@ -97,14 +97,14 @@ def source_event_versions_in_index(operations) private - def generate_s3_key_for(type, json_schema_version) + def generate_s3_key_for(type, schema_version) date = @clock.now.utc.strftime("%Y-%m-%d") uuid = ::SecureRandom.uuid [ @s3_file_prefix, type, - "v#{json_schema_version}", + "v#{schema_version}", date, "#{uuid}.jsonl.gz" ].join("/") diff --git a/elasticgraph-warehouse_lambda/spec/unit/elastic_graph/warehouse_lambda/warehouse_dumper_spec.rb b/elasticgraph-warehouse_lambda/spec/unit/elastic_graph/warehouse_lambda/warehouse_dumper_spec.rb index 47109d580..cac7a3ffe 100644 --- a/elasticgraph-warehouse_lambda/spec/unit/elastic_graph/warehouse_lambda/warehouse_dumper_spec.rb +++ b/elasticgraph-warehouse_lambda/spec/unit/elastic_graph/warehouse_lambda/warehouse_dumper_spec.rb @@ -30,22 +30,22 @@ class WarehouseLambda "type" => "Widget", "id" => "1", "version" => 3, - "json_schema_version" => 1, + SCHEMA_VERSION_KEY => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"} }) end it "writes operations to S3 as gzipped JSONL files and returns success results" do - op1 = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, "json_schema_version" => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) - op2 = new_primary_indexing_operation({"type" => "Widget", "id" => "2", "version" => 5, "json_schema_version" => 2, "record" => {"id" => "2", "dayOfWeek" => "TUE", "created_at" => "2024-09-15T13:30:12Z", "workspace_id" => "ws-2"}}) + op1 = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, SCHEMA_VERSION_KEY => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) + op2 = new_primary_indexing_operation({"type" => "Widget", "id" => "2", "version" => 5, SCHEMA_VERSION_KEY => 2, "record" => {"id" => "2", "dayOfWeek" => "TUE", "created_at" => "2024-09-15T13:30:12Z", "workspace_id" => "ws-2"}}) operations = [op1, op2] results = warehouse_dumper.bulk(operations) - # Verify S3 uploads - should have 2 files (one for json_schema_version 1, one for json_schema_version 2) + # Verify S3 uploads - should have 2 files (one for schema version 1, one for schema version 2) expect(s3_client.api_requests.map { |req| req[:operation_name] }).to eq [:put_object, :put_object] - # Verify first file (json_schema_version 1) + # Verify first file (schema version 1) params1 = s3_client.api_requests[0].fetch(:params) expect(params1[:bucket]).to eq s3_bucket_name expect(params1[:key]).to match %r{Data0001/Widget/v1/2024-09-15/[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\.jsonl\.gz} @@ -61,7 +61,7 @@ class WarehouseLambda lines1 = jsonl_content1.split("\n") expect(lines1.size).to eq 1 - # Verify second file (json_schema_version 2) + # Verify second file (schema version 2) params2 = s3_client.api_requests[1].fetch(:params) expect(params2[:key]).to match %r{Data0001/Widget/v2/2024-09-15/[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\.jsonl\.gz} @@ -99,8 +99,8 @@ class WarehouseLambda end it "writes operations of different types to separate S3 files" do - widget_op = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, "json_schema_version" => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) - component_op = new_primary_indexing_operation({"type" => "Component", "id" => "c1", "version" => 2, "json_schema_version" => 1, "record" => {"id" => "c1", "created_at" => "2024-09-15T12:30:12Z"}}) + widget_op = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, SCHEMA_VERSION_KEY => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) + component_op = new_primary_indexing_operation({"type" => "Component", "id" => "c1", "version" => 2, SCHEMA_VERSION_KEY => 1, "record" => {"id" => "c1", "created_at" => "2024-09-15T12:30:12Z"}}) operations = [widget_op, component_op] warehouse_dumper.bulk(operations) @@ -113,8 +113,8 @@ class WarehouseLambda end it "logs structured information about received batch and dumped files" do - widget_op = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, "json_schema_version" => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) - component_op = new_primary_indexing_operation({"type" => "Component", "id" => "c1", "version" => 2, "json_schema_version" => 1, "record" => {"id" => "c1", "created_at" => "2024-09-15T12:30:12Z"}}) + widget_op = new_primary_indexing_operation({"type" => "Widget", "id" => "1", "version" => 3, SCHEMA_VERSION_KEY => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"}}) + component_op = new_primary_indexing_operation({"type" => "Component", "id" => "c1", "version" => 2, SCHEMA_VERSION_KEY => 1, "record" => {"id" => "c1", "created_at" => "2024-09-15T12:30:12Z"}}) operations = [widget_op, component_op] warehouse_dumper.bulk(operations) @@ -127,13 +127,13 @@ class WarehouseLambda a_hash_including({ "s3_bucket" => s3_bucket_name, "type" => "Widget", - "json_schema_version" => 1, + SCHEMA_VERSION_KEY => 1, "record_count" => 1 }), a_hash_including({ "s3_bucket" => s3_bucket_name, "type" => "Component", - "json_schema_version" => 1, + SCHEMA_VERSION_KEY => 1, "record_count" => 1 }) ] @@ -183,7 +183,7 @@ class WarehouseLambda "type" => "Widget", "id" => "1", "version" => 3, - "json_schema_version" => 1, + SCHEMA_VERSION_KEY => 1, "record" => {"id" => "1", "dayOfWeek" => "MON", "created_at" => "2024-09-15T12:30:12Z", "workspace_id" => "ws-1"} }) diff --git a/spec_support/lib/elastic_graph/spec_support/builds_indexer_operation.rb b/spec_support/lib/elastic_graph/spec_support/builds_indexer_operation.rb index b9e7356b9..be32fb4af 100644 --- a/spec_support/lib/elastic_graph/spec_support/builds_indexer_operation.rb +++ b/spec_support/lib/elastic_graph/spec_support/builds_indexer_operation.rb @@ -32,7 +32,7 @@ def new_primary_indexing_operation(event, index_def: nil, idxr: indexer) Indexer::Operation::Update.new( event: event, - prepared_record: idxr.record_preparer_factory.for_latest_json_schema_version.prepare_for_index( + prepared_record: idxr.record_preparer_factory.for_latest_schema_version.prepare_for_index( event.fetch("type"), event.fetch("record"), idxr.schema_artifacts.index_mappings_by_index_def_name.fetch(index_def.name).fetch("properties") diff --git a/spec_support/lib/elastic_graph/spec_support/factories.rb b/spec_support/lib/elastic_graph/spec_support/factories.rb index 9e54a399b..5d345d9a4 100644 --- a/spec_support/lib/elastic_graph/spec_support/factories.rb +++ b/spec_support/lib/elastic_graph/spec_support/factories.rb @@ -24,7 +24,7 @@ module HashAsEmbedded # Strips indexed-type-only keys, converting an indexed factory hash # to one suitable for embedding in another record. def as_embedded - except(:__version, :__json_schema_version) + except(:__version, :__schema_version) end end end @@ -94,7 +94,7 @@ module ElasticGraphSpecSupport # For tests that really care about the version, they override it to control this more tightly. __version { version_counter += 1 } __typename { raise NotImplementedError, "You must supply __typename." } - __json_schema_version { 1 } + __schema_version { 1 } id { Faker::Alphanumeric.alpha(number: 20) } end diff --git a/spec_support/lib/elastic_graph/spec_support/uses_datastore.rb b/spec_support/lib/elastic_graph/spec_support/uses_datastore.rb index a2f0ba795..3102d5042 100644 --- a/spec_support/lib/elastic_graph/spec_support/uses_datastore.rb +++ b/spec_support/lib/elastic_graph/spec_support/uses_datastore.rb @@ -331,7 +331,7 @@ def index_into(indexer, *records) destination_index_mapping = indexer.schema_artifacts.index_mappings_by_index_def_name.fetch(index_def.name) ElasticGraph::Indexer::Operation::Update.new( event: event, - prepared_record: indexer.record_preparer_factory.for_latest_json_schema_version.prepare_for_index( + prepared_record: indexer.record_preparer_factory.for_latest_schema_version.prepare_for_index( event.fetch("type"), event.fetch("record"), destination_index_mapping.fetch("properties")