Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion tests/test_coverage_gaps2.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ def test_process_record_excludes_unique_id_from_payload():


def test_process_record_exception_returns_none():
# _map_unique_id moved to RecordProcessor (P5c); process_record delegates
# to it, so patch it there to force the exception path.
from tracebloc_ingestor.ingestors.record_processor import RecordProcessor

ing = make_ingestor(category=None)
with patch.object(ing, "_map_unique_id", side_effect=RuntimeError("boom")):
with patch.object(
RecordProcessor, "_map_unique_id", side_effect=RuntimeError("boom")
):
assert ing.process_record({"a": "1"}) is None


Expand Down
59 changes: 35 additions & 24 deletions tests/test_label_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
PASSTHROUGH,
)


# ---------------------------------------------------------------------------
# Pure function: apply()
# ---------------------------------------------------------------------------


class TestPassthroughPolicy:
def test_string_value_unchanged(self):
assert label_policy.apply("benign", PASSTHROUGH) == "benign"
Expand Down Expand Up @@ -96,20 +96,24 @@ def test_unknown_policy_raises():
# can construct BaseIngestor subclass-shape directly and exercise
# _map_unique_id which is where the label-policy hook lives.

class _TestIngestor:
"""Minimal stand-in mimicking the BaseIngestor surface needed for
_map_unique_id, without invoking BaseIngestor.__init__ (which calls
Database.create_table)."""

def __init__(self, label_column, label_policy_value, intent="train"):
from tracebloc_ingestor.ingestors.base import BaseIngestor
# Bind the unbound method so `self` works.
self._map_unique_id = BaseIngestor._map_unique_id.__get__(self)
self.label_column = label_column
self.label_policy = label_policy_value
self.intent = intent
self.annotation_column = None
self.unique_id_column = None # → UUID generation
def _TestIngestor(label_column, label_policy_value, intent="train"):
"""A RecordProcessor configured for the label-policy hook, which lives in
``_map_unique_id``. P5c moved that method off BaseIngestor into
RecordProcessor, so the tests exercise it there directly (no DB / no
BaseIngestor.__init__ needed)."""
from tracebloc_ingestor.ingestors.record_processor import RecordProcessor

return RecordProcessor(
schema={}, # _map_unique_id doesn't read schema
intent=intent,
label_column=label_column,
annotation_column=None,
unique_id_column=None, # → UUID generation
label_policy=label_policy_value,
category=None,
ingestor_id="test",
)


def test_base_ingestor_passthrough_does_not_mutate_label():
Expand Down Expand Up @@ -150,6 +154,7 @@ def test_base_ingestor_bucket_missing_label_uses_sentinel():
# Entrypoint integration: regression YAML config flows through with bucket
# ---------------------------------------------------------------------------


def test_entrypoint_passes_bucket_policy_for_regression(tmp_path, monkeypatch):
"""End-to-end-ish: a tabular_regression YAML reaches CSVIngestor with
label_policy='bucket' as kwarg, regardless of resolver internals."""
Expand All @@ -159,11 +164,13 @@ def test_entrypoint_passes_bucket_policy_for_regression(tmp_path, monkeypatch):
examples_dir = Path(__file__).resolve().parent.parent / "examples" / "yaml"
monkeypatch.setenv("INGEST_CONFIG", str(examples_dir / "tabular_regression.yaml"))

with patch("tracebloc_ingestor.cli.run.Config") as mock_config_cls, \
patch("tracebloc_ingestor.cli.run.Database"), \
patch("tracebloc_ingestor.cli.run.APIClient"), \
patch("tracebloc_ingestor.cli.run.CSVIngestor") as mock_csv_cls, \
patch("tracebloc_ingestor.cli.run.setup_logging"):
with patch("tracebloc_ingestor.cli.run.Config") as mock_config_cls, patch(
"tracebloc_ingestor.cli.run.Database"
), patch("tracebloc_ingestor.cli.run.APIClient"), patch(
"tracebloc_ingestor.cli.run.CSVIngestor"
) as mock_csv_cls, patch(
"tracebloc_ingestor.cli.run.setup_logging"
):
mock_config = MagicMock()
mock_config.BATCH_SIZE = 4000
mock_config_cls.return_value = mock_config
Expand All @@ -174,6 +181,7 @@ def test_entrypoint_passes_bucket_policy_for_regression(tmp_path, monkeypatch):
mock_csv_cls.return_value = instance

from tracebloc_ingestor.cli.run import main

rc = main()

assert rc == 0
Expand All @@ -189,11 +197,13 @@ def test_entrypoint_passes_passthrough_policy_for_classification(tmp_path, monke
examples_dir = Path(__file__).resolve().parent.parent / "examples" / "yaml"
monkeypatch.setenv("INGEST_CONFIG", str(examples_dir / "image_classification.yaml"))

with patch("tracebloc_ingestor.cli.run.Config") as mock_config_cls, \
patch("tracebloc_ingestor.cli.run.Database"), \
patch("tracebloc_ingestor.cli.run.APIClient"), \
patch("tracebloc_ingestor.cli.run.CSVIngestor") as mock_csv_cls, \
patch("tracebloc_ingestor.cli.run.setup_logging"):
with patch("tracebloc_ingestor.cli.run.Config") as mock_config_cls, patch(
"tracebloc_ingestor.cli.run.Database"
), patch("tracebloc_ingestor.cli.run.APIClient"), patch(
"tracebloc_ingestor.cli.run.CSVIngestor"
) as mock_csv_cls, patch(
"tracebloc_ingestor.cli.run.setup_logging"
):
mock_config = MagicMock()
mock_config.BATCH_SIZE = 4000
mock_config_cls.return_value = mock_config
Expand All @@ -204,6 +214,7 @@ def test_entrypoint_passes_passthrough_policy_for_classification(tmp_path, monke
mock_csv_cls.return_value = instance

from tracebloc_ingestor.cli.run import main

main()

_, kwargs = mock_csv_cls.call_args
Expand Down
209 changes: 22 additions & 187 deletions tracebloc_ingestor/ingestors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
from sqlalchemy.orm import Session
from sqlalchemy.engine import Engine
import logging
import pandas as pd
from tqdm import tqdm
import uuid

from ..database import Database
from ..api.client import APIClient
from ..utils.constants import (
Intent,
TaskCategory,
RESET,
BOLD,
GREEN,
Expand All @@ -24,6 +21,7 @@
from ..file_transfer import map_file_transfer
from ..reporting import ConsoleRenderer
from . import preflight
from .record_processor import RecordProcessor
from .table_lock import TableLock

# Per-category behavior flags now live in the ModalityRegistry (the single
Expand Down Expand Up @@ -220,193 +218,30 @@ def __init__(
self.table = None
self._table_schema = table_schema

def _map_unique_id(
self, record: Dict[str, Any], cleaned_record: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Maps the unique ID from the source record to data_id in the cleaned record.

Args:
record: Original record with all fields
cleaned_record: Processed record with schema fields

Returns:
Updated cleaned record if valid, None if invalid unique ID
"""

# validate intent is valid
if not self.intent or self.intent not in Intent.get_all_intents():
logger.warning(
f"Invalid intent: {self.intent}. Must be one of: {Intent.get_all_intents()}"
)
return None

# Validate label_column exists if specified
columns_to_validate = [
(self.label_column, "label_column"),
(self.annotation_column, "annotation_column"),
]
columns_not_found = False
for column, column_name in columns_to_validate:
if column and column not in record:
logger.warning(
f"Specified {column_name} '{column}' not found in record"
)
columns_not_found = True

if columns_not_found:
logger.warning(
f"Record {record} does not contain the required columns: {columns_not_found}"
)

if self.label_column:
# Apply the configured label policy at the latest possible moment
# before the API client builds its payload. For classification-class
# categories ``label_policy="passthrough"`` is a no-op; for
# regression-class categories ``"bucket"`` replaces the raw target
# with a stable hash-bucket ID so the value never leaks to the
# central backend (#44 / parent client#85).
#
# Coerce numpy / pandas scalar types to native Python before the
# policy runs. After the INT-cast switch to nullable ``Int64``,
# itertuples yields ``numpy.int64`` (the old ``downcast='integer'``
# incidentally produced plain ``int``) — and mysql-connector-python
# refuses to bind numpy scalars, failing the passthrough path with
# "Python type numpy.int64 cannot be converted" on every row of any
# INT label column (tabular_classification on the e2e job). The
# other policies (e.g. ``bucket``) stringify their output so they
# never hit this; the fix lives here so passthrough also yields a
# binder-friendly value.
label_val = record.get(self.label_column)
if hasattr(label_val, "item") and not isinstance(label_val, str):
try:
label_val = label_val.item()
except (ValueError, AttributeError):
pass
# Strip surrounding whitespace from string label values before
# the policy runs — protects against silent label-set
# corruption (issue #261) where ``" A "`` and ``"A"`` would
# otherwise land as distinct classes in MySQL. A user
# copy-pasting from Excel / another tool routinely has
# whitespace they can't see; the framework's contract for
# the label column is "the class identifier", and class
# identifiers don't carry whitespace semantics. The strip
# mirrors what the framework already does for the
# ``data_id`` column (line below) and for column headers
# (``chunk.columns.str.strip()`` in csv_ingestor).
#
# Non-string labels (INT class IDs, BIOLabelValidator's
# space-separated tags, etc.) pass through unchanged.
if isinstance(label_val, str):
label_val = label_val.strip()
cleaned_record["label"] = label_policy_module.apply(
label_val, self.label_policy
)

if self.intent:
cleaned_record["data_intent"] = self.intent

if self.annotation_column:
cleaned_record["annotation"] = record.get(self.annotation_column)

if not self.unique_id_column:
# logger.warning("No unique ID column specified, generating unique ID mapping")
cleaned_record["data_id"] = str(uuid.uuid4())
return cleaned_record

unique_id = record.get(self.unique_id_column)
if unique_id is not None and str(unique_id).strip():
cleaned_record["data_id"] = str(unique_id).strip()
return cleaned_record
else:
logger.warning(f"Missing or invalid unique ID for record: {record}")
return None
@property
def _record_processor(self) -> RecordProcessor:
"""The ingestor's per-record transform collaborator (P5c). Built from
the run's column / label / intent config; a fresh instance per access
is fine — it just holds those refs. ``process_record`` delegates here."""
return RecordProcessor(
schema=self.schema,
intent=self.intent,
label_column=self.label_column,
annotation_column=self.annotation_column,
unique_id_column=self.unique_id_column,
label_policy=self.label_policy,
category=self.category,
ingestor_id=self.ingestor_id,
)

def process_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a single record"""
try:
# Clean data according to schema, excluding label_column, annotation_column, and unique_id_column
# These are handled separately and should not be ingested as regular columns
columns_to_exclude = set()
if self.label_column:
columns_to_exclude.add(self.label_column)
if self.annotation_column:
columns_to_exclude.add(self.annotation_column)
if self.unique_id_column:
columns_to_exclude.add(self.unique_id_column)
# Preserve missing-data semantics: any null-like value becomes
# Python None so the DB binder writes SQL NULL. Treats four
# representations uniformly:
# - Python None (explicit absence, JSON null)
# - float NaN / pd.NaT (from pd.read_csv / pd.to_datetime)
# - pd.NA (from pandas StringDtype after #172)
# - literal "" string (JSON empty string — JSONIngestor reads
# via json.load, not pd.read_json, so ""
# survives to here; CSVs never hit this
# case because keep_default_na=True turns
# "" into NaN at read time)
# Mirrors the missing-data convention in
# JSONIngestor._validate_record (#170): `value is None or
# value == ""`. pd.isna returns False for ordinary
# strings/numbers/bools so existing values aren't touched.
# Booleans must NOT be stringified — mysql-connector-python writes
# True/False directly as TINYINT 1/0, but `str(True)` is the
# four-character string "True", which MySQL rejects against a BOOL
# column with `Incorrect integer value: 'True' for column 'active'
# at row 1`. This must catch BOTH Python `bool` AND `numpy.bool_`:
# a CSV BOOL column comes back from pandas/itertuples as numpy.bool_,
# and `isinstance(np.True_, bool)` is False — so the previous
# `isinstance(v, bool)` check missed it and every CSV boolean was
# stringified to "True"/"False" and rejected by MySQL. `is_bool`
# covers both; convert to a plain Python bool so the binder writes
# 1/0. Checked FIRST so a bool never reaches the `v == ""` compare
# (numpy scalar-vs-str comparison would warn) and pd.NA (is_bool
# False) falls through to the null branch. The rest of the pipeline
# expects strings, so everything non-bool/non-null is stringified.
cleaned_record = {
k.strip(): (
bool(v)
if pd.api.types.is_bool(v)
else None if pd.isna(v) or v == "" else str(v).strip()
)
for k, v in record.items()
if k in self.schema and k not in columns_to_exclude
}
# Map unique ID if specified
cleaned_record = self._map_unique_id(record, cleaned_record)

logger.info(f"Cleaned record: {cleaned_record}")

if cleaned_record is None:
return None

# Add ingestor_id to the record
cleaned_record["ingestor_id"] = self.ingestor_id
cleaned_record["filename"] = record.get("filename")
cleaned_record["extension"] = record.get("extension")
# Preserve mask_id for semantic_segmentation ONLY. The
# cleaned_record comprehension above filters by ``k in
# self.schema``, but for the documented 8-line schema-less
# example yaml that filter drops every CSV column including
# mask_id — which file_transfer.py:401 needs to locate the
# per-row mask file. Without this, every record was skipped at
# file-transfer with "No mask_id found in record" despite
# #207's FilePairingValidator pass.
#
# Scoped to SEMANTIC_SEGMENTATION because mask_id is a runtime
# indirection only — there's no `mask_id` column on the
# standard tracebloc table (see database.py:standard_columns),
# so putting it on every category's cleaned_record would break
# SQL inserts on tables that don't have it (#212 bugbot).
# _process_batch additionally pops it before insert so even
# the semseg path doesn't try to bind it as a column.
if self.category == TaskCategory.SEMANTIC_SEGMENTATION:
cleaned_record["mask_id"] = record.get("mask_id")
return cleaned_record
"""Process a single record into the cleaned, DB-ready dict.

except Exception as e:
logger.error(f"Error processing record: {str(e)}")
return None
Delegates to the :class:`RecordProcessor` collaborator (structural
refactor P5c); kept as the ingestor's method since the ingest loop and
tests call ``self.process_record`` / ``ing.process_record``.
"""
return self._record_processor.process(record)

@property
def _table_lock(self) -> TableLock:
Expand Down
Loading
Loading