Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions tests/test_ingestor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def test_summary_clean_run_no_failures():
@pytest.mark.parametrize("kwargs", [
dict(failed_records=1),
dict(file_transfer_failures=1),
dict(skipped_records=1), # a record dropped during processing (#234)
dict(inserted_records=9), # < total
dict(api_sent_records=9), # < inserted
])
Expand Down Expand Up @@ -428,17 +429,63 @@ def test_ingest_still_calls_edge_label_for_label_carrying_categories():
ing.api_client.send_generate_edge_label_meta.assert_called_once()


def test_ingest_skips_records_that_fail_processing():
# invalid intent -> process_record returns None -> counted as skipped
def test_ingest_fails_fast_on_invalid_intent():
# A bad intent is a run-wide config error, not a per-row skip: it must
# abort loudly before any DB work (#234), not silently skip every record
# and exit 0 with an empty dataset and a Job marked Succeeded.
records = [{"a": "1", "filename": "f1"}]
ing = make_ingestor(records=records, category=None, intent="bogus")
with pytest.raises(ValueError, match="intent"):
ing.ingest("src", batch_size=10)
ing.database.insert_batch.assert_not_called()


def test_ingest_counts_dropped_record_as_failure():
# A record dropped during processing (here: a missing unique_id when
# unique_id_column is set) must be surfaced as a failed record so the run
# exits non-zero — not silently skipped with a clean exit 0 (#234). The
# dropped record never reaches the DB.
records = [{"a": "1", "filename": "f1"}] # no 'uid' -> _map_unique_id drops it
ing = make_ingestor(
records=records, category=None, intent="train", unique_id_column="uid"
)
with patch.object(base_mod, "Session") as Sess:
Sess.return_value.__enter__.return_value = MagicMock()
failed = ing.ingest("src", batch_size=10)
assert failed == []
assert len(failed) == 1
assert failed[0]["error"] == "record_dropped_in_processing"
ing.database.insert_batch.assert_not_called()


def test_ingest_keeps_good_records_and_counts_dropped():
# The canonical #234 scenario: a mixed run. The good record ingests; the
# dropped one (blank unique_id) is surfaced as a failure, the summary
# records the drop and trips has_failures — so a partial run can NOT be
# reported as a clean success (the "0 failures / most records failed"
# contradiction).
records = [{"a": "1", "uid": "x1"}, {"a": "2", "uid": " "}] # 2nd: blank uid
ing = make_ingestor(
records=records, category=None, intent="train", unique_id_column="uid"
)
captured = {}
real_log = BaseIngestor._log_summary

def spy(self, summary):
captured["summary"] = summary
return real_log(self, summary)

with patch.object(base_mod, "Session") as Sess, \
patch.object(BaseIngestor, "_log_summary", spy):
Sess.return_value.__enter__.return_value = MagicMock()
failed = ing.ingest("src", batch_size=10)

summary = captured["summary"]
assert summary.skipped_records == 1
assert summary.has_failures is True
assert any(f["error"] == "record_dropped_in_processing" for f in failed)
ing.database.insert_batch.assert_called() # the good record reached the DB


def test_ingest_reraises_on_session_error():
records = [{"a": "1", "filename": "f1"}]
ing = make_ingestor(records=records, category=None)
Expand Down
20 changes: 13 additions & 7 deletions tests/test_json_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,25 @@ def test_read_data_invalid_top_level_type_raises(tmp_path):
list(ing.read_data(str(p)))


def test_read_data_skips_non_dict_items(tmp_path):
def test_read_data_non_dict_item_raises(tmp_path):
# A non-object array item is malformed data: fail fast instead of silently
# skipping it (which exited 0 on a partial ingest — #234) and diverging
# from CSV/the gate, which abort on a bad record (#235).
p = _write_json(tmp_path, [{"a": 1}, "not-a-dict", {"a": 2}])
ing = make_json_ingestor(schema={"a": "INT"})
records = list(ing.read_data(str(p)))
assert len(records) == 2
with pytest.raises(ValueError, match="not an object"):
list(ing.read_data(str(p)))


def test_read_data_skips_records_failing_validation(tmp_path):
# record missing unique_id_column -> _validate_record raises -> skipped
def test_read_data_record_failing_validation_raises(tmp_path):
# A record missing the configured unique_id_column makes _validate_record
# raise; that now propagates (fail-fast) instead of silently skipping the
# record and reporting a partial ingest as success (#234), consistent with
# the CSV cast and the DataValidator gate (#235).
p = _write_json(tmp_path, [{"a": 1}, {"a": 2, "uid": "x"}])
ing = make_json_ingestor(schema={"a": "INT"}, unique_id_column="uid")
records = list(ing.read_data(str(p)))
assert records == [{"a": 2, "uid": "x"}]
with pytest.raises(ValueError, match="uid"):
list(ing.read_data(str(p)))


def test_read_data_malformed_json_raises(tmp_path):
Expand Down
63 changes: 54 additions & 9 deletions tracebloc_ingestor/ingestors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ class IngestionSummary(NamedTuple):
def has_failures(self) -> bool:
"""True if any non-trivial failure occurred — DB insert short of
total, API short of inserted, file-transfer skipped any record,
or processing errored. Used to gate the "completed successfully"
a record was dropped during processing (skipped_records), or
processing errored. Used to gate the "completed successfully"
banner so customers can't mistake a partial run for a clean one.
"""
return (
self.failed_records > 0
or self.file_transfer_failures > 0
or self.skipped_records > 0
or self.inserted_records < self.total_records
or self.api_sent_records < self.inserted_records
)
Expand Down Expand Up @@ -710,6 +712,24 @@ def _count_records(self, source: Any) -> Optional[int]:
logger.debug(f"Unable to count records: {str(e)}")
return None

def _check_intent(self) -> None:
"""Fail fast on a missing/invalid ``intent`` before any lock, DB, or
row work (#234).

``intent`` is a single run-wide config value, not per-row data. When
it was wrong (e.g. a ``"trian"`` typo), ``_map_unique_id`` returned
None for EVERY record, so every row was silently skipped and — before
#234 — the run still exited 0 with an empty dataset and a Job marked
Succeeded. A config error must abort loudly, not masquerade as N
per-row skips.
"""
if not self.intent or self.intent not in Intent.get_all_intents():
raise ValueError(
f"{RED}Invalid intent {self.intent!r}. Must be one of "
f"{Intent.get_all_intents()}. This is a configuration error — "
f"set 'intent: train' or 'intent: test' in your config.{RESET}"
)

def ingest(self, source: Any, batch_size: int = 50) -> List[Dict[str, Any]]:
"""
Ingest data from the source with progress tracking
Expand All @@ -732,6 +752,9 @@ def ingest(self, source: Any, batch_size: int = 50) -> List[Dict[str, Any]]:
# including ones the inner ``except Exception`` doesn't catch
# (Session() construction failure, _count_records exceptions,
# KeyboardInterrupt, etc.).
# Fail fast on a config error (bad intent) before acquiring a table
# lock or touching the DB — it would otherwise skip every row (#234).
self._check_intent()
_lock_path = self._acquire_table_lock()
try:
return self._ingest_with_lock(source, batch_size)
Expand Down Expand Up @@ -840,7 +863,24 @@ def _ingest_with_lock(
pbar.update(len(batch))
batch = []
else:
# process_record returned None: the record was
# dropped (blank/invalid unique_id, or an error
# inside process_record). Count it AND surface it
# as a failed record so the run exits non-zero —
# a dropped record is silent data loss, not a
# clean skip. Before #234 these reached only
# skipped_records, never failed_records, so
# run_ingestion returned [] and the K8s Job was
# marked Succeeded despite losing rows. The
# specific reason was already logged by
# process_record / _map_unique_id.
stats["skipped_records"] += 1
failed_records.append(
{
"record": record,
"error": "record_dropped_in_processing",
}
)
pbar.update(1) # Update progress bar for skipped records
except Exception as e:
# Count processing errors (including missing columns) as failed records
Expand Down Expand Up @@ -1160,17 +1200,22 @@ def _log_summary(self, summary: IngestionSummary):
f"{BOLD}📊 Success Rate:{RESET} [{status_color}{bar}{RESET}] {status_color}{success_rate:.1f}%{RESET}"
)

# Status banner. Any non-trivial failure (DB, API, or file-transfer)
# disqualifies the "completed successfully" message — a customer
# seeing 🎉 should be able to trust that no record was silently
# dropped. The three failure channels are mutually exclusive per
# record (file-transfer failures never reach DB; DB failures never
# reach API; api_only_failures are records that hit DB but didn't
# ship), so summing them gives a clean unique count instead of
# the double-count `total_records - api_sent_records` would produce.
# Status banner. Any non-trivial failure (DB, API, file-transfer, or a
# record dropped during processing) disqualifies the "completed
# successfully" message — a customer seeing 🎉 should be able to trust
# that no record was silently dropped. The four failure channels are
# mutually exclusive per record (a dropped record never reaches
# file-transfer; file-transfer failures never reach DB; DB failures
# never reach API; api_only_failures are records that hit DB but didn't
# ship), so summing them gives a clean unique count instead of the
# double-count `total_records - api_sent_records` would produce.
# ``skipped_records`` MUST be included or the count contradicts the
# severity text — a run that drops most rows printed "0 failure(s)"
# while success_rate said "most records failed" (#234).
total_failures = (
summary.failed_records
+ summary.file_transfer_failures
+ summary.skipped_records
+ api_only_failures
)
if not summary.has_failures:
Expand Down
29 changes: 17 additions & 12 deletions tracebloc_ingestor/ingestors/json_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,21 +371,26 @@ def _iter_validated_records(
single-object and array-streaming paths in ``read_data``. Factored
out so the array path can ``yield from`` it inside the ``with
open(...)`` block — the file handle stays open exactly as long as
the generator is being consumed."""
the generator is being consumed.

A type-invalid or malformed record RAISES (fail-fast); it is NOT
silently skipped. Skipping reported a partial ingest as success and
exited 0 (#234), and it diverged from the CSV cast and the
DataValidator gate, which both abort on a bad record. Raising makes
JSON behave consistently with CSV (#235): a bad record aborts the
run with a clear, non-zero exit instead of vanishing from the
dataset with only a log line nobody durably sees."""
for record in records:
if not isinstance(record, dict):
logger.warning(
f"{YELLOW}Skipping invalid record: {record}{RESET}"
)
continue
try:
self._validate_record(record)
yield record # Let base class handle the cleaning and unique ID mapping
except ValueError as e:
logger.warning(
f"{YELLOW}Skipping invalid record: {str(e)}{RESET}"
raise ValueError(
f"{RED}JSON record is not an object: {record!r}. Every item "
f"in a JSON array must be an object mapping column names to "
f"values.{RESET}"
)
continue
# _validate_record raises ValueError on a type-invalid record;
# let it propagate (fail-fast) rather than skip-and-continue.
self._validate_record(record)
yield record # base handles cleaning + unique-ID mapping

def _count_records(self, file_path: str) -> Optional[int]:
"""Count total records in JSON file without materialising it.
Expand Down
Loading