diff --git a/ami/main/admin.py b/ami/main/admin.py index f0a130577..b2943a08f 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -13,6 +13,9 @@ from ami import tasks from ami.jobs.models import Job from ami.ml.models.project_pipeline_config import ProjectPipelineConfig +from ami.ml.post_processing.admin.actions import make_post_processing_action +from ami.ml.post_processing.admin.small_size_filter_form import SmallSizeFilterActionForm +from ami.ml.post_processing.small_size_filter import SmallSizeFilterTask from ami.ml.tasks import remove_duplicate_classifications from .models import ( @@ -456,6 +459,19 @@ def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: def detections_count(self, obj) -> int: return obj.detections_count + # Per-occurrence post-processing trigger. Same factory as the capture-set + # action on SourceImageCollectionAdmin, scoped to one occurrence — the fast + # spot/dev path for iterating on a filter without running a whole collection. + # New per-occurrence tasks add their own action here the same way. + run_small_size_filter = make_post_processing_action( + SmallSizeFilterTask, + SmallSizeFilterActionForm, + scope_resolver=lambda occurrence: {"occurrence_id": occurrence.pk}, + name_resolver=lambda task_cls, occurrence: (f"Post-processing: {task_cls.name} on Occurrence {occurrence.pk}"), + ) + + actions = [run_small_size_filter] + ordering = ("-created_at",) # Add classifications as inline @@ -694,25 +710,18 @@ def populate_collection_async(self, request: HttpRequest, queryset: QuerySet[Sou f"Populating {len(queued_tasks)} capture set(s) background tasks: {queued_tasks}.", ) - @admin.action(description="Run Small Size Filter post-processing task (async)") - def run_small_size_filter(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: - jobs = [] - for collection in queryset: - job = Job.objects.create( - name=f"Post-processing: SmallSizeFilter on Capture Set {collection.pk}", - project=collection.project, - job_type_key="post_processing", - params={ - "task": "small_size_filter", - "config": { - "source_image_collection_id": collection.pk, - }, - }, - ) - job.enqueue() - jobs.append(job.pk) - - self.message_user(request, f"Queued Small Size Filter for {queryset.count()} capture set(s). Jobs: {jobs}") + # Built from the shared post-processing action factory: renders an intermediate + # confirmation page with the task's knob form, validates each selection against + # SmallSizeFilterConfig, then enqueues one Job per capture set. New post-processing + # tasks declare their own trigger the same way (task class + form + scope_resolver). + run_small_size_filter = make_post_processing_action( + SmallSizeFilterTask, + SmallSizeFilterActionForm, + scope_resolver=lambda collection: {"source_image_collection_id": collection.pk}, + name_resolver=lambda task_cls, collection: ( + f"Post-processing: {task_cls.name} on Capture Set {collection.pk}" + ), + ) actions = [ populate_collection, diff --git a/ami/ml/post_processing/admin/__init__.py b/ami/ml/post_processing/admin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ami/ml/post_processing/admin/actions.py b/ami/ml/post_processing/admin/actions.py new file mode 100644 index 000000000..fb99b474b --- /dev/null +++ b/ami/ml/post_processing/admin/actions.py @@ -0,0 +1,287 @@ +"""Shared admin-action machinery for triggering post-processing tasks. + +Every post-processing task surfaces the same admin flow: + +1. The operator selects rows and picks the action. +2. An intermediate confirmation page renders the task's knob form. +3. On submit, each row's config is validated against the task's pydantic + ``config_schema`` and a Job is enqueued. + +``make_post_processing_action`` builds the action callable for that flow so +each task only declares what varies: its task class, its knob form, and how a +selected row maps to a Job (scope + project + name). Tasks whose row→Job +mapping doesn't fit the default one-Job-per-row shape (e.g. partitioning events +across projects) pass their own ``build_jobs`` callable. + +Validation lives in one place: the task's ``config_schema``. The knob form only +declares fields (label, help text, widget); it does not re-encode the schema's +rules. Schema errors raised while building Jobs are mapped back onto the form so +the operator sees them inline on the confirmation page. +""" +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import Any, Protocol + +import pydantic +from django.contrib import admin, messages +from django.db import transaction +from django.db.models import Model +from django.db.models.query import QuerySet +from django.http import HttpRequest, HttpResponse +from django.template.response import TemplateResponse +from django.urls import reverse + +from ami.jobs.models import Job +from ami.ml.post_processing.admin.forms import BasePostProcessingActionForm +from ami.ml.post_processing.base import BasePostProcessingTask + +logger = logging.getLogger(__name__) + +CONFIRMATION_TEMPLATE = "admin/post_processing/confirmation.html" + + +class _ModelAdminProto(Protocol): + """The slice of ``ModelAdmin`` the generic action touches.""" + + model: type[Model] + admin_site: Any + + def message_user(self, request: HttpRequest, message: str, level: Any = ..., **kwargs: Any) -> None: + ... + + +class ConfigValidationErrors(Exception): + """Raised by ``build_jobs`` when one or more rows produce invalid config. + + Carries ``(field_name_or_None, message)`` pairs so the caller can attach + them to the knob form and re-render the confirmation page instead of + creating any Jobs. + """ + + def __init__(self, errors: list[tuple[str | None, str]]): + self.errors = errors + super().__init__(f"{len(errors)} invalid config(s)") + + +def _schema_errors_to_form_fields( + exc: pydantic.ValidationError, + form_field_names: set[str], +) -> list[tuple[str | None, str]]: + """Map a pydantic ``ValidationError`` onto form field names where possible. + + Errors on a field the form renders are attached to that field; everything + else (e.g. an injected scope field) becomes a non-field error. + """ + mapped: list[tuple[str | None, str]] = [] + for err in exc.errors(): + loc = err.get("loc") or () + field = str(loc[0]) if loc else None + target = field if field in form_field_names else None + mapped.append((target, err.get("msg", "Invalid value"))) + return mapped + + +def default_build_jobs( + *, + model_admin: _ModelAdminProto, + request: HttpRequest, + config: dict[str, Any], + queryset: QuerySet, + task_cls: type[BasePostProcessingTask], + form_field_names: set[str], + scope_resolver: Callable[[Any], dict[str, Any]], + project_resolver: Callable[[Any], Any], + name_resolver: Callable[[type[BasePostProcessingTask], Any], str], +) -> list[int]: + """Validate every selected row, then enqueue one Job per row (all-or-nothing). + + Each row's full config is ``{**config, **scope_resolver(row)}`` validated + against ``task_cls.config_schema``. If any row fails, nothing is created and + ``ConfigValidationErrors`` is raised so the form can re-render with the + errors inline. + """ + validated: list[tuple[Any, pydantic.BaseModel]] = [] + errors: list[tuple[str | None, str]] = [] + + for obj in queryset: + full_config = {**config, **scope_resolver(obj)} + try: + validated.append((obj, task_cls.config_schema(**full_config))) + except pydantic.ValidationError as exc: + errors.extend(_schema_errors_to_form_fields(exc, form_field_names)) + + if errors: + raise ConfigValidationErrors(errors) + + # Create all Jobs in one transaction so the operation stays all-or-nothing even + # if a create fails mid-loop. (Admin requests are already atomic via + # ATOMIC_REQUESTS, but this helper may also be called outside a request — e.g. a + # management command — where there's no ambient transaction.) Job.enqueue() uses + # transaction.on_commit, so enqueues fire only once the block commits. + job_pks: list[int] = [] + with transaction.atomic(): + for obj, model in validated: + job = Job.objects.create( + name=name_resolver(task_cls, obj), + project=project_resolver(obj), + job_type_key="post_processing", + params={"task": task_cls.key, "config": model.dict()}, + ) + job.enqueue() + job_pks.append(job.pk) + return job_pks + + +def render_confirmation( + model_admin: _ModelAdminProto, + request: HttpRequest, + queryset: QuerySet, + *, + task_cls: type[BasePostProcessingTask], + form: BasePostProcessingActionForm, + action_name: str, + title: str, + submit_label: str, +) -> TemplateResponse: + """Render the shared intermediate confirmation page for ``task_cls``.""" + opts = model_admin.model._meta + # Resolve the selection once; count from the materialized list (one query, not two). + selected_pks = [str(pk) for pk in queryset.values_list("pk", flat=True)] + return TemplateResponse( + request, + CONFIRMATION_TEMPLATE, + { + **model_admin.admin_site.each_context(request), + "title": title, + "task_label": task_cls.name, + "form": form, + "selected_count": len(selected_pks), + "selected_pks": selected_pks, + "action_name": action_name, + "submit_label": submit_label, + "changelist_url": reverse(f"admin:{opts.app_label}_{opts.model_name}_changelist"), + "model_meta": opts, + "opts": opts, + "action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME, + }, + ) + + +def _default_name_resolver(task_cls: type[BasePostProcessingTask], obj: Any) -> str: + return f"Post-processing: {task_cls.name} on {obj._meta.verbose_name} {obj.pk}" + + +def make_post_processing_action( + task_cls: type[BasePostProcessingTask], + form_class: type[BasePostProcessingActionForm], + *, + scope_resolver: Callable[[Any], dict[str, Any]] | None = None, + project_resolver: Callable[[Any], Any] = lambda obj: obj.project, + name_resolver: Callable[[type[BasePostProcessingTask], Any], str] = _default_name_resolver, + build_jobs: Callable[..., list[int]] | None = None, + description: str | None = None, + title: str | None = None, + submit_label: str | None = None, +) -> Callable[[_ModelAdminProto, HttpRequest, QuerySet], HttpResponse | None]: + """Build a Django admin action that triggers ``task_cls`` via the shared flow. + + Args: + task_cls: the post-processing task. ``key``/``name``/``config_schema`` + drive the action name, labels, and config validation. + form_class: the knob form rendered on the confirmation page. + scope_resolver: maps a selected row to the config fields identifying its + scope, e.g. ``lambda c: {"source_image_collection_id": c.pk}``. + Required unless a custom ``build_jobs`` is supplied. + project_resolver: maps a row to the Job's project (default ``obj.project``). + name_resolver: maps ``(task_cls, row)`` to the Job name. + build_jobs: escape hatch for tasks whose row→Job mapping isn't one + Job per row (e.g. partitioning across projects). Receives the same + keyword arguments as ``default_build_jobs`` and returns created Job + pks; raise ``ConfigValidationErrors`` to re-render the form. + description: admin action dropdown label. + title / submit_label: confirmation-page heading and button text. + + The returned callable's ``__name__`` is ``run_`` so Django + registers it under that name and the confirmation page's hidden ``action`` + field round-trips correctly. + """ + if build_jobs is None and scope_resolver is None: + raise ValueError("make_post_processing_action requires scope_resolver unless build_jobs is supplied") + + action_name = f"run_{task_cls.key}" + resolved_title = title or f"Run {task_cls.name}" + resolved_submit = submit_label or resolved_title + resolved_description = description or f"Run {task_cls.name} post-processing task (async)" + + def action( + model_admin: _ModelAdminProto, + request: HttpRequest, + queryset: QuerySet, + ) -> HttpResponse | None: + def _render(form: BasePostProcessingActionForm) -> TemplateResponse: + return render_confirmation( + model_admin, + request, + queryset, + task_cls=task_cls, + form=form, + action_name=action_name, + title=resolved_title, + submit_label=resolved_submit, + ) + + # "Select all across pages" hands us the entire filtered table as the + # queryset and would serialize every pk into hidden inputs on the + # confirmation page (a huge POST body, possibly over request limits). + # This admin trigger is for explicit, bounded selections; refuse the + # across-pages case rather than render an unbounded form. + if request.POST.get("select_across") == "1": + model_admin.message_user( + request, + f'"Select all across pages" is not supported for {task_cls.name}. ' + "Select the specific rows you want to process.", + level=messages.WARNING, + ) + return None + + if not request.POST.get("confirm"): + return _render(form_class()) + + form = form_class(request.POST) + if not form.is_valid(): + return _render(form) + + runner = build_jobs or default_build_jobs + kwargs: dict[str, Any] = dict( + model_admin=model_admin, + request=request, + config=form.to_config(), + queryset=queryset, + task_cls=task_cls, + form_field_names=set(form.fields), + project_resolver=project_resolver, + name_resolver=name_resolver, + ) + # Only forward scope_resolver when set. A custom build_jobs supplied without + # a scope_resolver should not receive a None it might try to call. + if scope_resolver is not None: + kwargs["scope_resolver"] = scope_resolver + try: + job_pks = runner(**kwargs) + except ConfigValidationErrors as exc: + for field, message in exc.errors: + form.add_error(field, message) + return _render(form) + + model_admin.message_user( + request, + f"Queued {task_cls.name} for {len(job_pks)} {model_admin.model._meta.verbose_name}(s). Jobs: {job_pks}", + level=messages.SUCCESS, + ) + return None + + action.__name__ = action_name + action.__qualname__ = action_name + return admin.action(description=resolved_description)(action) diff --git a/ami/ml/post_processing/admin/forms.py b/ami/ml/post_processing/admin/forms.py new file mode 100644 index 000000000..7a2dbf5c9 --- /dev/null +++ b/ami/ml/post_processing/admin/forms.py @@ -0,0 +1,25 @@ +"""Form base for admin actions that trigger post-processing tasks. + +Each post-processing task surfaces its tunable knobs as a Django form. The +form's ``cleaned_data`` becomes the ``config`` payload on the resulting Job +(after validation against the task's pydantic ``config_schema``). + +Algorithm scope (which queryset/events/collection the action runs against) +lives outside the form because it varies per admin entry-point. +""" +from __future__ import annotations + +from django import forms + + +class BasePostProcessingActionForm(forms.Form): + """Marker base for post-processing admin action forms. + + Subclasses declare task-specific fields. Override ``to_config()`` if the + 1:1 ``cleaned_data → config`` mapping needs adjustment (e.g. drop empty + optional fields, derive computed values, rename keys). + """ + + def to_config(self) -> dict: + """Return ``cleaned_data`` shaped for ``Job.params['config']``.""" + return dict(self.cleaned_data) diff --git a/ami/ml/post_processing/admin/small_size_filter_form.py b/ami/ml/post_processing/admin/small_size_filter_form.py new file mode 100644 index 000000000..955f552f5 --- /dev/null +++ b/ami/ml/post_processing/admin/small_size_filter_form.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from django import forms + +from ami.ml.post_processing.admin.forms import BasePostProcessingActionForm +from ami.ml.post_processing.small_size_filter import SmallSizeFilterConfig + + +class SmallSizeFilterActionForm(BasePostProcessingActionForm): + """Knobs surfaced when an admin triggers Small Size Filter. + + The valid range for ``size_threshold`` lives on ``SmallSizeFilterConfig`` + (the single source of truth); this form only declares the field's + presentation. The admin action validates submitted values against the + schema and surfaces any error inline on this field. + """ + + size_threshold = forms.FloatField( + label="Size threshold", + initial=SmallSizeFilterConfig.__fields__["size_threshold"].default, + help_text=( + "Minimum bounding-box area as a fraction of the source image area " + "(width x height). Detections smaller than this are flagged as " + "'Not identifiable'. Must be between 0 and 1 (exclusive). " + "Default 0.0008 ≈ 0.08% of frame area." + ), + ) diff --git a/ami/ml/post_processing/base.py b/ami/ml/post_processing/base.py index c96bd723e..863312344 100644 --- a/ami/ml/post_processing/base.py +++ b/ami/ml/post_processing/base.py @@ -1,8 +1,11 @@ import abc +import inspect import logging import typing from typing import Any, Optional +import pydantic + from ami.ml.models import Algorithm from ami.ml.models.algorithm import AlgorithmTaskType @@ -13,15 +16,26 @@ class BasePostProcessingTask(abc.ABC): """ Abstract base class for all post-processing tasks. + + Subclasses must declare a Pydantic ``config_schema`` describing the shape of + ``Job.params['config']``. Config is validated at task construction so bad + payloads fail fast in worker logs (and earlier still — admin triggers and + other callers should validate via the same schema before enqueueing a Job). """ # Each task must override these key: str name: str + config_schema: type[pydantic.BaseModel] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) - required_attrs = ["key", "name"] + # Only enforce the contract on concrete tasks. An abstract intermediary + # (e.g. a shared mixin with an unimplemented abstractmethod) may legitimately + # defer key/name/config_schema to its concrete subclasses. + if inspect.isabstract(cls): + return + required_attrs = ["key", "name", "config_schema"] for attr in required_attrs: if not hasattr(cls, attr) or getattr(cls, attr) is None: raise TypeError(f"{cls.__name__} must define '{attr}' class attribute") @@ -33,7 +47,7 @@ def __init__( **config: Any, ): self.job = job - self.config = config + self.config: pydantic.BaseModel = self.config_schema(**config) # Choose the right logger if logger is not None: self.logger = logger @@ -52,7 +66,7 @@ def __init__( ) self.algorithm: Algorithm = algorithm - self.logger.info(f"Initialized {self.name } with config={self.config}, job={job}") + self.logger.info(f"Initialized {self.name} with config={self.config.dict()}, job={job}") def update_progress(self, progress: float): """ @@ -67,6 +81,26 @@ def update_progress(self, progress: float): # No job object — fallback to plain logging self.logger.info(f"[{self.name}] Progress {progress:.0%}") + def report_stage_metrics(self, metrics: dict[str, Any]): + """Surface human-readable counters on the job's post-processing stage. + + Each ``{label: value}`` pair becomes a stage parameter visible on the Jobs + admin page (e.g. ``{"Detections checked": 540, "Flagged": 12}``), so an + operator can see what a run examined and changed without reading the log. + Labels may contain spaces; pass them as dict keys rather than kwargs. + + Falls back to a single log line when running jobless (tests, management + commands), so the same call site works in both contexts. + """ + if not self.job: + self.logger.info(f"[{self.name}] " + ", ".join(f"{k}: {v}" for k, v in metrics.items())) + return + + stage_key = self.job.job_type_key + for label, value in metrics.items(): + self.job.progress.add_or_update_stage_param(stage_key, label, value) + self.job.save(update_fields=["progress"]) + @abc.abstractmethod def run(self) -> None: """ diff --git a/ami/ml/post_processing/small_size_filter.py b/ami/ml/post_processing/small_size_filter.py index 143b3651b..ea762ce3b 100644 --- a/ami/ml/post_processing/small_size_filter.py +++ b/ami/ml/post_processing/small_size_filter.py @@ -1,3 +1,5 @@ +import pydantic +from django.db.models import QuerySet from django.utils import timezone from ami.main.models import Classification, Detection, Occurrence, SourceImageCollection, Taxon, TaxonRank @@ -5,14 +7,65 @@ from ami.ml.schemas import BoundingBox +class SmallSizeFilterConfig(pydantic.BaseModel): + # Scope: exactly one of these identifies which detections to examine. A capture + # set is the bulk path; a single occurrence is the spot/dev path (fast feedback + # while tuning a filter). This discriminated-scope shape is the pattern other + # post-processing tasks copy when they gain per-occurrence / per-event triggers. + source_image_collection_id: int | None = None + occurrence_id: int | None = None + size_threshold: float = 0.0008 + + @pydantic.validator("size_threshold") + def _threshold_in_unit_interval(cls, v: float) -> float: + if not (0.0 < v < 1.0): + raise ValueError("size_threshold must be in (0, 1) exclusive") + return v + + @pydantic.root_validator(skip_on_failure=True) + def _exactly_one_scope(cls, values: dict) -> dict: + scopes = [values.get("source_image_collection_id"), values.get("occurrence_id")] + if sum(s is not None for s in scopes) != 1: + raise ValueError("Provide exactly one of source_image_collection_id or occurrence_id") + return values + + class Config: + extra = "forbid" + + class SmallSizeFilterTask(BasePostProcessingTask): key = "small_size_filter" name = "Small size filter" + config_schema = SmallSizeFilterConfig + + def _scoped_detections(self, config: SmallSizeFilterConfig) -> tuple[QuerySet[Detection], str]: + """Resolve the detections to examine from whichever scope the config carries. + + ``config_schema`` guarantees exactly one of the scope ids is set, so the + single ``else`` branch is sound. + """ + if config.occurrence_id is not None: + if not Occurrence.objects.filter(pk=config.occurrence_id).exists(): + msg = f"Occurrence {config.occurrence_id} not found" + self.logger.error(msg) + raise ValueError(msg) + detections = Detection.objects.filter(occurrence_id=config.occurrence_id) + scope_desc = f"occurrence {config.occurrence_id}" + else: + try: + collection = SourceImageCollection.objects.get(pk=config.source_image_collection_id) + except SourceImageCollection.DoesNotExist: + msg = f"SourceImageCollection {config.source_image_collection_id} not found" + self.logger.error(msg) + raise ValueError(msg) + self.logger.info(f"Loaded SourceImageCollection {collection.pk} (Project={collection.project})") + detections = Detection.objects.filter(source_image__collections=collection) + scope_desc = f"collection {collection.pk}" + return detections.select_related("source_image", "occurrence"), scope_desc def run(self) -> None: - # Could we use a pydantic model for config validation if it's just for this task? - threshold = self.config.get("size_threshold", 0.0008) - collection_id = self.config.get("source_image_collection_id") + config: SmallSizeFilterConfig = self.config # type: ignore[assignment] + threshold = config.size_threshold # Get or create the "Not identifiable" taxon not_identifiable_taxon, _ = Taxon.objects.get_or_create( @@ -24,29 +77,19 @@ def run(self) -> None: ) self.logger.info(f"=== Starting {self.name} ===") - if not collection_id: - msg = "Missing required config param: source_image_collection_id" - self.logger.error(msg) - raise ValueError(msg) - - try: - collection = SourceImageCollection.objects.get(pk=collection_id) - self.logger.info(f"Loaded SourceImageCollection {collection_id} (Project={collection.project})") - except SourceImageCollection.DoesNotExist: - msg = f"SourceImageCollection {collection_id} not found" - self.logger.error(msg) - raise ValueError(msg) - - detections = Detection.objects.filter(source_image__collections=collection).select_related( - "source_image", "occurrence" - ) + detections, scope_desc = self._scoped_detections(config) total = detections.count() - self.logger.info(f"Found {total} detections in collection {collection_id}") + self.logger.info(f"Found {total} detections in {scope_desc}") classifications_to_create: list[Classification] = [] # Can't use set until an instance has an id detections_to_update: set[Detection] = set() occcurrences_to_update: set[Occurrence] = set() modified_detections = 0 + # Track occurrence ids across flush batches so an occurrence whose detections + # span more than one batch is counted once, not once per batch. + updated_occurrence_ids: set[int] = set() + modified_occurrences = 0 + checked = 0 for i, det in enumerate(detections.iterator(), start=1): bbox = det.get_bbox() @@ -92,6 +135,7 @@ def run(self) -> None: # Update progress every 100 detections if i % 100 == 0 or i == total: + checked = i modified_detections += len(detections_to_update) # with transaction.atomic(): @@ -108,9 +152,19 @@ def run(self) -> None: self.logger.info(f"Updating {len(occcurrences_to_update)} occurrences") for occ in occcurrences_to_update: occ.save(update_determination=True) + if occ.pk is not None: + updated_occurrence_ids.add(occ.pk) + modified_occurrences = len(updated_occurrence_ids) occcurrences_to_update.clear() progress = i / total if total > 0 else 1.0 self.update_progress(progress) + self.report_stage_metrics( + { + "detections_checked": checked, + "detections_flagged": modified_detections, + "occurrences_updated": modified_occurrences, + } + ) self.logger.info(f"=== Completed {self.name}: {modified_detections} of {total} detections modified ===") diff --git a/ami/ml/post_processing/tests/__init__.py b/ami/ml/post_processing/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ami/ml/post_processing/tests/test_action_factory.py b/ami/ml/post_processing/tests/test_action_factory.py new file mode 100644 index 000000000..395abac3a --- /dev/null +++ b/ami/ml/post_processing/tests/test_action_factory.py @@ -0,0 +1,169 @@ +"""Tests for the shared post-processing admin-action factory. + +Covers the generic glue in ``ami/ml/post_processing/admin/actions.py``: the +action's registered name, the default one-Job-per-row builder (task key, scope +injection, project FK), schema-error mapping back onto form fields, and the +``build_jobs`` override hook used by tasks with a non-default row→Job mapping. + +DB-backed classes build a bare Project/Collection once per class via +``setUpTestData`` — the factory only reads FKs, so the heavier +``setup_test_project`` fixture is unnecessary. +""" +import pytest +from django.contrib import admin as django_admin +from django.test import RequestFactory, SimpleTestCase, TestCase + +from ami.jobs.models import Job +from ami.main.models import Project, SourceImageCollection +from ami.ml.post_processing.admin.actions import ( + ConfigValidationErrors, + default_build_jobs, + make_post_processing_action, +) +from ami.ml.post_processing.admin.small_size_filter_form import SmallSizeFilterActionForm +from ami.ml.post_processing.small_size_filter import SmallSizeFilterTask + + +def _ssf_kwargs(queryset, config): + """Common keyword arguments for invoking the small-size-filter job builder.""" + return dict( + model_admin=None, + request=None, + config=config, + queryset=queryset, + task_cls=SmallSizeFilterTask, + form_field_names={"size_threshold"}, + scope_resolver=lambda c: {"source_image_collection_id": c.pk}, + project_resolver=lambda c: c.project, + name_resolver=lambda task_cls, c: f"SSF {c.pk}", + ) + + +class TestActionRegistration(SimpleTestCase): + """Factory construction is pure function-building — no DB involved.""" + + def test_action_name_is_run_plus_task_key(self): + action = make_post_processing_action( + SmallSizeFilterTask, + SmallSizeFilterActionForm, + scope_resolver=lambda c: {"source_image_collection_id": c.pk}, + ) + self.assertEqual(action.__name__, "run_small_size_filter") + + def test_default_description_derives_from_task_name(self): + action = make_post_processing_action( + SmallSizeFilterTask, + SmallSizeFilterActionForm, + scope_resolver=lambda c: {"source_image_collection_id": c.pk}, + ) + # @admin.action stores the dropdown label on the function. + self.assertEqual(action.short_description, "Run Small size filter post-processing task (async)") + + def test_requires_scope_resolver_or_build_jobs(self): + with pytest.raises(ValueError, match="scope_resolver"): + make_post_processing_action(SmallSizeFilterTask, SmallSizeFilterActionForm) + + +class TestDefaultBuildJobs(TestCase): + @classmethod + def setUpTestData(cls) -> None: + cls.project = Project.objects.create(name="Factory default_build_jobs test") + cls.collection = SourceImageCollection.objects.create(project=cls.project, name="c1") + + def test_creates_one_job_per_row_with_task_key_and_scope(self): + qs = SourceImageCollection.objects.filter(pk=self.collection.pk) + job_pks = default_build_jobs(**_ssf_kwargs(qs, {"size_threshold": 0.001})) + + self.assertEqual(len(job_pks), 1) + job = Job.objects.get(pk=job_pks[0]) + self.assertEqual(job.project_id, self.project.pk) + self.assertEqual(job.job_type_key, "post_processing") + self.assertEqual(job.params["task"], "small_size_filter") + self.assertEqual(job.params["config"]["size_threshold"], 0.001) + self.assertEqual(job.params["config"]["source_image_collection_id"], self.collection.pk) + + def test_out_of_range_config_raises_and_creates_no_jobs(self): + qs = SourceImageCollection.objects.filter(pk=self.collection.pk) + with pytest.raises(ConfigValidationErrors) as exc_info: + default_build_jobs(**_ssf_kwargs(qs, {"size_threshold": 2.0})) + + # The bad field is one the form renders, so it maps to that field (not non-field). + fields = {field for field, _ in exc_info.value.errors} + self.assertIn("size_threshold", fields) + self.assertEqual(Job.objects.filter(job_type_key="post_processing").count(), 0) + + def test_create_failure_midway_rolls_back_all_jobs(self): + # Validation passes for both rows, so we reach the creation loop. Force a + # failure while creating the *second* Job and assert the first one is rolled + # back — i.e. the transaction.atomic() wrap makes creation all-or-nothing. + # Without it, the first INSERT would leak before the failure. + good = SourceImageCollection.objects.create(project=self.project, name="good") + qs = SourceImageCollection.objects.filter(pk__in=[self.collection.pk, good.pk]).order_by("pk") + + seen: list[int] = [] + + def exploding_name_resolver(task_cls, obj): + seen.append(obj.pk) + if len(seen) == 2: + raise RuntimeError("boom while creating the second job") + return f"SSF {obj.pk}" + + kwargs = _ssf_kwargs(qs, {"size_threshold": 0.001}) + kwargs["name_resolver"] = exploding_name_resolver + + with pytest.raises(RuntimeError): + default_build_jobs(**kwargs) + + self.assertEqual(Job.objects.filter(job_type_key="post_processing").count(), 0) + + +class _StubAdmin: + """Minimal ModelAdmin stand-in for invoking an action outside the admin site.""" + + model = SourceImageCollection + admin_site = django_admin.site + + def __init__(self) -> None: + self.messages: list[tuple[str, object]] = [] + + def message_user(self, request, message, level=None, **kwargs) -> None: + self.messages.append((message, level)) + + +class TestBuildJobsOverrideHook(TestCase): + @classmethod + def setUpTestData(cls) -> None: + cls.project = Project.objects.create(name="Factory build_jobs hook test") + cls.collection = SourceImageCollection.objects.create(project=cls.project, name="c1") + + def test_custom_build_jobs_is_used_instead_of_default(self): + calls = {} + + def custom_build_jobs(**kwargs): + calls.update(kwargs) + return [101, 102] + + action = make_post_processing_action( + SmallSizeFilterTask, + SmallSizeFilterActionForm, + build_jobs=custom_build_jobs, + ) + # build_jobs supplied, so scope_resolver is optional (no ValueError on construction). + self.assertEqual(action.__name__, "run_small_size_filter") + + # Actually invoke the action through the confirm leg so the custom runner runs. + request = RequestFactory().post("/", data={"confirm": "yes", "size_threshold": "0.001"}) + admin_stub = _StubAdmin() + queryset = SourceImageCollection.objects.filter(pk=self.collection.pk) + + result = action(admin_stub, request, queryset) + + # Success leg returns None and reports the pks the custom runner produced. + self.assertIsNone(result) + self.assertTrue(calls, "custom build_jobs was never invoked") + self.assertEqual(calls["task_cls"], SmallSizeFilterTask) + self.assertEqual(calls["config"], {"size_threshold": 0.001}) + # scope_resolver was not supplied, so it must not be forwarded as None. + self.assertNotIn("scope_resolver", calls) + self.assertEqual(len(admin_stub.messages), 1) + self.assertIn("[101, 102]", admin_stub.messages[0][0]) diff --git a/ami/ml/post_processing/tests/test_admin_form.py b/ami/ml/post_processing/tests/test_admin_form.py new file mode 100644 index 000000000..d74f19b93 --- /dev/null +++ b/ami/ml/post_processing/tests/test_admin_form.py @@ -0,0 +1,45 @@ +"""Tests for ``BasePostProcessingActionForm`` + concrete ``SmallSizeFilterActionForm``. + +The knob form only declares field presentation; the valid range for +``size_threshold`` is owned by ``SmallSizeFilterConfig`` (the schema), not the +form. Bound enforcement therefore lives in ``test_base_schema.py`` and in the +admin-action flow (``test_small_size_filter_admin.py`` / +``test_action_factory.py``), not here. The happy valid-value path is likewise +covered end to end by those flows. Forms never touch the DB → SimpleTestCase. +""" +from django import forms +from django.test import SimpleTestCase + +from ami.ml.post_processing.admin.forms import BasePostProcessingActionForm +from ami.ml.post_processing.admin.small_size_filter_form import SmallSizeFilterActionForm + + +class _OneFieldForm(BasePostProcessingActionForm): + threshold = forms.FloatField(initial=0.5) + + +class TestBasePostProcessingActionForm(SimpleTestCase): + def test_to_config_returns_cleaned_data(self): + form = _OneFieldForm(data={"threshold": "0.25"}) + self.assertTrue(form.is_valid()) + self.assertEqual(form.to_config(), {"threshold": 0.25}) + + +class TestSmallSizeFilterActionForm(SimpleTestCase): + def test_default_initial_matches_config_default(self): + form = SmallSizeFilterActionForm() + self.assertEqual(form.fields["size_threshold"].initial, 0.0008) + + def test_non_numeric_threshold_rejected_at_form_layer(self): + # Type coercion is still the form's job; range is not. + form = SmallSizeFilterActionForm(data={"size_threshold": "not-a-number"}) + self.assertFalse(form.is_valid()) + self.assertIn("size_threshold", form.errors) + + def test_out_of_range_threshold_passes_form_but_is_rejected_by_schema(self): + # The form no longer enforces the (0, 1) bound — that is the schema's job. + # An out-of-range value is a valid float, so the form accepts it; the admin + # action then rejects it when validating against SmallSizeFilterConfig. + form = SmallSizeFilterActionForm(data={"size_threshold": "2.0"}) + self.assertTrue(form.is_valid()) + self.assertEqual(form.to_config(), {"size_threshold": 2.0}) diff --git a/ami/ml/post_processing/tests/test_base_schema.py b/ami/ml/post_processing/tests/test_base_schema.py new file mode 100644 index 000000000..79380a40c --- /dev/null +++ b/ami/ml/post_processing/tests/test_base_schema.py @@ -0,0 +1,75 @@ +"""Tests for the pydantic ``config_schema`` contract on ``BasePostProcessingTask``.""" +import abc +import inspect + +import pydantic +import pytest +from django.test import TestCase + +from ami.ml.post_processing.base import BasePostProcessingTask +from ami.ml.post_processing.small_size_filter import SmallSizeFilterConfig, SmallSizeFilterTask + + +class TestConfigSchemaContract(TestCase): + """``__init_subclass__`` enforces ``config_schema``; ``__init__`` validates against it.""" + + def test_subclass_without_config_schema_raises(self): + with pytest.raises(TypeError, match="config_schema"): + + class Missing(BasePostProcessingTask): + key = "missing" + name = "Missing schema" + + def run(self) -> None: + pass + + def test_abstract_subclass_is_not_required_to_declare_contract(self): + # An abstract intermediary (leaves an abstractmethod unimplemented) may defer + # key/name/config_schema to its concrete subclasses without raising. + class AbstractFilter(BasePostProcessingTask): + @abc.abstractmethod + def filter_step(self) -> None: + ... + + def run(self) -> None: # pragma: no cover - never instantiated + pass + + self.assertTrue(inspect.isabstract(AbstractFilter)) + + def test_valid_config_builds_basemodel_instance(self): + task = SmallSizeFilterTask(source_image_collection_id=1, size_threshold=0.001) + self.assertIsInstance(task.config, SmallSizeFilterConfig) + config: SmallSizeFilterConfig = task.config # type: ignore[assignment] + self.assertEqual(config.size_threshold, 0.001) + self.assertEqual(config.source_image_collection_id, 1) + + def test_default_value_applies_when_omitted(self): + task = SmallSizeFilterTask(source_image_collection_id=1) + config: SmallSizeFilterConfig = task.config # type: ignore[assignment] + self.assertEqual(config.size_threshold, 0.0008) + + def test_occurrence_scope_is_accepted(self): + # The discriminated scope: an occurrence id is a valid alternative to a + # collection id (the per-occurrence / dev trigger). + task = SmallSizeFilterTask(occurrence_id=7) + config: SmallSizeFilterConfig = task.config # type: ignore[assignment] + self.assertEqual(config.occurrence_id, 7) + self.assertIsNone(config.source_image_collection_id) + + def test_both_scopes_at_once_raises(self): + # Exactly one scope must be set; supplying both is ambiguous. + with pytest.raises(pydantic.ValidationError): + SmallSizeFilterTask(source_image_collection_id=1, occurrence_id=7) + + def test_no_scope_raises(self): + # Neither scope set — nothing identifies which detections to examine. + with pytest.raises(pydantic.ValidationError): + SmallSizeFilterTask(size_threshold=0.001) + + def test_invalid_config_raises_at_init(self): + with pytest.raises(pydantic.ValidationError): + SmallSizeFilterTask(source_image_collection_id=1, size_threshold=2.0) + + def test_unknown_keys_rejected(self): + with pytest.raises(pydantic.ValidationError): + SmallSizeFilterTask(source_image_collection_id=1, unknown_field="oops") diff --git a/ami/ml/post_processing/tests/test_small_size_filter_admin.py b/ami/ml/post_processing/tests/test_small_size_filter_admin.py new file mode 100644 index 000000000..3977b47e5 --- /dev/null +++ b/ami/ml/post_processing/tests/test_small_size_filter_admin.py @@ -0,0 +1,162 @@ +"""Admin-action tests for the migrated Small Size Filter trigger. + +Covers the intermediate confirmation page, the per-collection Job creation, +the schema-validated config payload, and form-error re-render. + +Fixtures are intentionally minimal: bare Project/Collection/Occurrence rows, +created once per class via ``setUpTestData``. The admin flow only reads FKs — +it never touches captures, taxa, or events — so the full ``setup_test_project`` +fixture (storage source, deployment, processing service per call) is wasted +cost here and was what made this module the slowest part of the suite. +""" +from django.contrib import admin as django_admin +from django.test import Client, TestCase +from django.urls import reverse + +from ami.jobs.models import Job +from ami.main.models import Occurrence, Project, SourceImageCollection +from ami.users.models import User + + +class _SmallSizeFilterAdminCase(TestCase): + @classmethod + def setUpTestData(cls) -> None: + cls.superuser = User.objects.create_superuser( + email=f"ssfadmin+{cls.__name__}@example.com", + password="x", + ) + cls.project = Project.objects.create(name=f"SSF admin test ({cls.__name__})") + cls.collection = SourceImageCollection.objects.create( + project=cls.project, + name="SSF admin test collection", + ) + + def setUp(self) -> None: + self.client = Client() + self.client.force_login(self.superuser) + + def _post(self, data: dict, pks: list[int] | None = None): + url = reverse("admin:main_sourceimagecollection_changelist") + selected = [str(pk) for pk in (pks or [self.collection.pk])] + payload = { + "action": "run_small_size_filter", + django_admin.helpers.ACTION_CHECKBOX_NAME: selected, + **data, + } + return self.client.post(url, data=payload) + + +class TestSmallSizeFilterIntermediatePage(_SmallSizeFilterAdminCase): + def test_renders_intermediate_page_without_confirm(self): + response = self._post({}) + self.assertEqual(response.status_code, 200) + # Labels derive from SmallSizeFilterTask.name ("Small size filter"). + self.assertIn(b"Run Small size filter", response.content) + # The form's field is named size_threshold; the rendered uses that name. + self.assertIn(b'name="size_threshold"', response.content) + # No Job created on the GET-equivalent step. + self.assertEqual( + Job.objects.filter(project=self.project, job_type_key="post_processing").count(), + 0, + ) + + def test_select_across_is_refused_without_creating_jobs(self): + # "Select all across pages" would serialize the whole table into hidden + # inputs; the action refuses it instead of rendering an unbounded form. + response = self._post({"confirm": "yes", "size_threshold": "0.001", "select_across": "1"}) + self.assertEqual(response.status_code, 302) + self.assertEqual( + Job.objects.filter(project=self.project, job_type_key="post_processing").count(), + 0, + ) + + def test_invalid_threshold_rerenders_form_with_error(self): + response = self._post({"confirm": "yes", "size_threshold": "2.0"}) + self.assertEqual(response.status_code, 200) + # No Job created when form fails. + self.assertEqual( + Job.objects.filter(project=self.project, job_type_key="post_processing").count(), + 0, + ) + # Error class present in rendered template. + self.assertIn(b"errornote", response.content) + + +class TestSmallSizeFilterCreatesJob(_SmallSizeFilterAdminCase): + def test_valid_post_creates_one_job_with_threshold_in_config(self): + response = self._post({"confirm": "yes", "size_threshold": "0.001"}) + self.assertEqual(response.status_code, 302) + + job = Job.objects.get( + project=self.project, + job_type_key="post_processing", + ) + self.assertEqual(job.params["task"], "small_size_filter") + self.assertEqual(job.params["config"]["size_threshold"], 0.001) + self.assertEqual(job.params["config"]["source_image_collection_id"], self.collection.pk) + + +class TestSmallSizeFilterOccurrenceScope(TestCase): + """The per-occurrence trigger on OccurrenceAdmin uses the same factory with an + ``occurrence_id`` scope — the fast spot/dev path for iterating on a filter.""" + + @classmethod + def setUpTestData(cls) -> None: + cls.superuser = User.objects.create_superuser(email="ssfocc@example.com", password="x") + cls.project = Project.objects.create(name="SSF occurrence-scope test") + cls.occurrence = Occurrence.objects.create(project=cls.project) + + def setUp(self) -> None: + self.client = Client() + self.client.force_login(self.superuser) + + def test_valid_post_creates_one_job_scoped_to_the_occurrence(self): + url = reverse("admin:main_occurrence_changelist") + response = self.client.post( + url, + data={ + "action": "run_small_size_filter", + django_admin.helpers.ACTION_CHECKBOX_NAME: [str(self.occurrence.pk)], + "confirm": "yes", + "size_threshold": "0.001", + }, + ) + self.assertEqual(response.status_code, 302) + + job = Job.objects.get(project=self.project, job_type_key="post_processing") + self.assertEqual(job.params["task"], "small_size_filter") + self.assertEqual(job.params["config"]["occurrence_id"], self.occurrence.pk) + # Collection scope stays absent so the schema's exactly-one-scope rule holds. + self.assertIsNone(job.params["config"].get("source_image_collection_id")) + + +class TestSmallSizeFilterMultiCollection(_SmallSizeFilterAdminCase): + @classmethod + def setUpTestData(cls) -> None: + super().setUpTestData() + # Second collection in a different project. + cls.other_project = Project.objects.create(name="SSF admin test (other project)") + cls.other_collection = SourceImageCollection.objects.create( + project=cls.other_project, + name="Other collection", + ) + + def test_multi_collection_creates_one_job_per_collection_with_correct_project_fk(self): + response = self._post( + {"confirm": "yes", "size_threshold": "0.001"}, + pks=[self.collection.pk, self.other_collection.pk], + ) + self.assertEqual(response.status_code, 302) + + jobs = Job.objects.filter(job_type_key="post_processing").order_by("project_id") + self.assertEqual(jobs.count(), 2) + + by_project = {j.project_id: j for j in jobs} + self.assertEqual( + by_project[self.project.pk].params["config"]["source_image_collection_id"], + self.collection.pk, + ) + self.assertEqual( + by_project[self.other_project.pk].params["config"]["source_image_collection_id"], + self.other_collection.pk, + ) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index ea375135e..f3f4f25db 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -1324,24 +1324,29 @@ def test_labels_data_conversion_methods(self): class TestPostProcessingTasks(TestCase): - def setUp(self): - # Create test project, deployment, and default setup - self.project, self.deployment = setup_test_project() - create_taxa(project=self.project) - self._create_images_with_dimensions(deployment=self.deployment) - group_images_into_events(deployment=self.deployment) + @classmethod + def setUpTestData(cls): + # Project, taxa, images, events, and the collection are read-only from the + # tests' point of view — build them once per class. Detections (and the + # task runs that mutate them) happen per-test inside each test's + # rolled-back transaction. + cls.project, cls.deployment = setup_test_project() + create_taxa(project=cls.project) + cls._create_images_with_dimensions(deployment=cls.deployment) + group_images_into_events(deployment=cls.deployment) # Create a simple SourceImageCollection for testing - self.collection = SourceImageCollection.objects.create( + cls.collection = SourceImageCollection.objects.create( name="Test PostProcessing Collection", - project=self.project, + project=cls.project, method="manual", - kwargs={"image_ids": list(self.deployment.captures.values_list("pk", flat=True))}, + kwargs={"image_ids": list(cls.deployment.captures.values_list("pk", flat=True))}, ) - self.collection.populate_sample() + cls.collection.populate_sample() + @classmethod def _create_images_with_dimensions( - self, + cls, deployment, num_images: int = 5, width: int = 640, @@ -1414,6 +1419,76 @@ def test_small_size_filter_assigns_not_identifiable(self): f"Occurrence {occurrence.pk} should have its determination set to 'Not identifiable'.", ) + def test_occurrence_scope_only_touches_that_occurrence(self): + """Per-occurrence scope: running with ``occurrence_id`` flags only that + occurrence's detections and leaves sibling occurrences untouched.""" + detections = [] + for image in self.collection.images.all(): + det = Detection.objects.create( + source_image=image, + bbox=[0, 0, 10, 10], # small + created_at=datetime.datetime.now(datetime.timezone.utc), + ) + det.associate_new_occurrence() + detections.append(det) + self.assertGreaterEqual(len(detections), 2) + + target = detections[0] + SmallSizeFilterTask(occurrence_id=target.occurrence_id, size_threshold=0.01).run() + + not_identifiable_taxon = Taxon.objects.get(name="Not identifiable") + self.assertEqual( + Classification.objects.filter(detection=target, taxon=not_identifiable_taxon).count(), + 1, + "The scoped occurrence's detection should be flagged.", + ) + for other in detections[1:]: + self.assertFalse( + Classification.objects.filter(detection=other, taxon=not_identifiable_taxon).exists(), + f"Detection {other.pk} outside the scoped occurrence should be untouched.", + ) + + def test_run_reports_stage_metrics_on_job(self): + """The task surfaces ``detections_checked`` / ``detections_flagged`` / + ``occurrences_updated`` as stage params on its Job so an operator can see + what a run examined and changed without reading the log.""" + from ami.jobs.models import Job + + for image in self.collection.images.all(): + Detection.objects.create( + source_image=image, + bbox=[0, 0, 10, 10], # small → flagged + created_at=datetime.datetime.now(datetime.timezone.utc), + ).associate_new_occurrence() + total = Detection.objects.filter(source_image__in=self.collection.images.all()).count() + self.assertGreater(total, 0) + + job = Job.objects.create( + project=self.project, + name="stage metrics test", + job_type_key="post_processing", + params={ + "task": "small_size_filter", + "config": {"source_image_collection_id": self.collection.pk, "size_threshold": 0.01}, + }, + ) + job.progress.add_stage("Post Processing", key="post_processing") + job.save() + + SmallSizeFilterTask( + job=job, + source_image_collection_id=self.collection.pk, + size_threshold=0.01, + ).run() + + job.refresh_from_db() + params = {p.name: p.value for p in job.progress.get_stage("post_processing").params} + self.assertEqual(params.get("detections_checked"), total) + self.assertEqual(params.get("detections_flagged"), total) # every detection is small + # Each detection has its own occurrence here, so the deduped occurrence + # count equals the detection count. + self.assertEqual(params.get("occurrences_updated"), total) + class TestTaskStateManager(TestCase): """Test TaskStateManager for job progress tracking.""" diff --git a/ami/templates/admin/post_processing/_form_fieldset.html b/ami/templates/admin/post_processing/_form_fieldset.html new file mode 100644 index 000000000..968292113 --- /dev/null +++ b/ami/templates/admin/post_processing/_form_fieldset.html @@ -0,0 +1,9 @@ +{% for error in form.non_field_errors %}

{{ error }}

{% endfor %} +{% for field in form %} +
+ {{ field.label_tag }} + {{ field }} + {% if field.help_text %}

{{ field.help_text }}

{% endif %} + {% for error in field.errors %}

{{ error }}

{% endfor %} +
+{% endfor %} diff --git a/ami/templates/admin/post_processing/confirmation.html b/ami/templates/admin/post_processing/confirmation.html new file mode 100644 index 000000000..565f3f2e0 --- /dev/null +++ b/ami/templates/admin/post_processing/confirmation.html @@ -0,0 +1,37 @@ +{% extends "admin/base_site.html" %} + +{% load i18n admin_urls %} + +{% block title %} + {{ title }} | {{ site_title|default:_("Django site admin") }} +{% endblock title %} +{% block breadcrumbs %} + +{% endblock breadcrumbs %} +{% block content %} +
+ {% csrf_token %} + {% block intro %} +

+ You are about to run {{ task_label }} on + {{ selected_count }} selected + {{ model_meta.verbose_name }}{{ selected_count|pluralize }}. +

+ {% endblock intro %} +
+ {% translate "Parameters" %} + {% include "admin/post_processing/_form_fieldset.html" with form=form %} +
+ {% for pk in selected_pks %}{% endfor %} + + + +
+{% endblock content %} diff --git a/docs/claude/planning/2026-05-01-post-processing-admin-scaffolding-design.md b/docs/claude/planning/2026-05-01-post-processing-admin-scaffolding-design.md new file mode 100644 index 000000000..d2ebbc991 --- /dev/null +++ b/docs/claude/planning/2026-05-01-post-processing-admin-scaffolding-design.md @@ -0,0 +1,347 @@ +# Post-Processing Admin Scaffolding — Design + +**Status:** Draft (awaiting user review) +**Date:** 2026-05-01 +**Branch:** `feat/post-processing-admin-scaffolding` +**Author:** Michael Bunsen (with Claude Opus 4.7) + +> **Update (2026-06-04):** Following review feedback, the admin controller glue +> was abstracted rather than left as per-task copy-paste. The confirm/render/ +> validate/enqueue flow now lives in `ami/ml/post_processing/admin/actions.py` +> as `make_post_processing_action(task_cls, form_class, scope_resolver=..., build_jobs=...)`. +> Each task declares only what varies (task class, knob form, row→Job mapping); +> tasks that don't fit one-Job-per-row (e.g. #1272's per-project event +> partitioning) pass a custom `build_jobs` callable. Config validation is owned +> solely by the task's pydantic `config_schema` — the knob form no longer +> re-encodes the bounds, and schema errors are mapped back onto the form for +> inline display. This supersedes the "module-private `_render_confirmation`, +> lift later" plan in the "Admin Action Rewrite" section below. + +## Context + +`ami/ml/post_processing/` currently ships one task on main: `SmallSizeFilterTask` (PR #954, merged). Two open PRs add more post-processing tasks and each independently grew its own admin-trigger plumbing: + +- **PR #999** (`feat/postprocessing-class-masking`, mohamedelabbas1996, open since 2025-10-14) adds `class_masking` and `rank_rollup` tasks. Admin trigger uses hand-rolled HTML in the action method; no `forms.Form` class. +- **PR #1272** (`claude/revive-tracking-feature-OyMO3`, current author, open) adds `tracking` task. Admin trigger uses a `forms.Form` subclass in `ami/ml/post_processing/admin_forms.py` with scope-aware dropdown init and per-project Job partitioning. + +Both PRs touch `ami/main/admin.py`, `ami/ml/post_processing/registry.py`, and add a near-identical `*_confirmation.html` template. Three independent ad-hoc patterns are emerging where one shared one would do. + +The existing `SmallSizeFilterTask` already reads `size_threshold` from `Job.params['config']` (default `0.0008`) — but the admin trigger hardcodes empty config, so the knob is unreachable. There's even a TODO comment in `small_size_filter.py:14` asking *"Could we use a pydantic model for config validation if it's just for this task?"*. This precursor PR answers that question and lands the answer as the shared pattern. + +## Goal + +Land a small precursor PR that establishes the shared admin-trigger pattern for post-processing tasks, using `SmallSizeFilterTask` as the migration consumer. Both #999 and #1272 rebase onto it and adopt the pattern. No new domain logic, no PR coordination drama, no carving up another contributor's work. + +Admin is **not** the long-term primary trigger surface for post-processing — REST API + UI will eventually drive this. The scaffolding here optimises for current-state needs (admin-only) without painting future API integration into a corner. + +## Scope + +### In + +1. **Pydantic config schema contract** on `BasePostProcessingTask` +2. **`BasePostProcessingActionForm`** — django form base class, `cleaned_data → config` contract +3. **Parameterized confirmation template** + form-fieldset partial +4. **Migrate `SmallSizeFilterTask`** to new pattern: + - Add `SmallSizeFilterConfig(size_threshold: float = 0.0008, source_image_collection_id: int)` pydantic model + - Add `SmallSizeFilterActionForm` with one field: `size_threshold` (`FloatField`, validation: `0 < x < 1`) + - Rewrite `SourceImageCollectionAdmin.run_small_size_filter` to render intermediate confirmation page using new template + form +5. **Tests** for scaffolding + migrated task + +### Out + +- Project-partitioning helper (defer to whichever multi-scope adopter lands first — #999 or #1272) +- REST API endpoints for triggering post-processing +- Management commands +- pgvector migrations (#1272 territory) +- Rank rollup (stays in #999 — PR coordination unnecessary now) +- Class masking (stays in #999) +- Tracking (stays in #1272) + +## Module Layout + +```text +ami/ml/post_processing/ +├── base.py # MODIFIED — +config_schema contract +├── registry.py # unchanged +├── small_size_filter.py # MODIFIED — schema-validated config, .config now BaseModel +├── admin/ # NEW +│ ├── __init__.py +│ ├── forms.py # BasePostProcessingActionForm +│ └── small_size_filter_form.py # SmallSizeFilterActionForm +└── tests/ # NEW directory (existing tests in ami/ml/tests.py) + ├── __init__.py + ├── test_base_schema.py + ├── test_admin_form.py + └── test_small_size_filter_admin.py + +ami/templates/admin/post_processing/ # NEW +├── confirmation.html # parameterized shell +└── _form_fieldset.html # partial — renders form fields uniformly + +ami/main/admin.py # MODIFIED — run_small_size_filter rewrites onto new pattern +``` + +Path note: spec uses `ami/ml/post_processing/admin/` as user requested. Tracking PR's `admin_forms.py` (top-level module) becomes `admin/tracking_form.py` on its rebase. + +## Pydantic Schema Contract + +`BasePostProcessingTask` gains a required class attribute `config_schema: type[BaseModel]` and validates config at construction. + +```python +# ami/ml/post_processing/base.py (sketch) +import pydantic + +class BasePostProcessingTask(abc.ABC): + key: str + name: str + config_schema: type[pydantic.BaseModel] # NEW + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + for attr in ("key", "name", "config_schema"): + if not hasattr(cls, attr) or getattr(cls, attr) is None: + raise TypeError(f"{cls.__name__} must define '{attr}' class attribute") + + def __init__(self, job=None, logger=None, **config): + self.job = job + # Validate config against schema. Raises pydantic.ValidationError on bad input. + self.config: pydantic.BaseModel = self.config_schema(**config) + # ... existing logger + algorithm setup unchanged ... +``` + +**Pydantic version:** repo uses Pydantic v1 (per `requirements/base.txt` + container-side memory). Use v1 syntax: `BaseModel`, `Field`, `validator`, `.dict()`. No `model_dump`/`model_validate`. + +**Validation timing:** + +- **Worker side (always):** `BasePostProcessingTask.__init__` validates. Bad config in a Job's params → task crashes with clear pydantic error in job logs. +- **Admin side (added in this PR):** the rewritten `run_small_size_filter` calls `SmallSizeFilterConfig(**form.to_config(), source_image_collection_id=collection.pk)` *before* `Job.objects.create`. Validation error → form re-renders with error, no Job created. + +**Per-task schema example:** + +```python +# ami/ml/post_processing/small_size_filter.py +class SmallSizeFilterConfig(pydantic.BaseModel): + source_image_collection_id: int + size_threshold: float = 0.0008 + + @pydantic.validator("size_threshold") + def _threshold_in_unit_interval(cls, v): + if not (0.0 < v < 1.0): + raise ValueError("size_threshold must be in (0, 1) exclusive") + return v + + class Config: + extra = "forbid" # unknown keys rejected — catches typos +``` + +**Migration impact for existing in-flight Jobs:** none. `Job.params['config']` payloads already match the new schema (only `source_image_collection_id` is required; `size_threshold` defaults). Workers picking up old jobs after deploy will validate cleanly. + +## Admin Form Base + +```python +# ami/ml/post_processing/admin/forms.py +class BasePostProcessingActionForm(forms.Form): + """Base for admin forms that build BasePostProcessingTask config dicts. + + Subclass adds knob fields. Override to_config() if mapping isn't 1:1 + (e.g. drop empty optional fields, derive computed values). + """ + + def to_config(self) -> dict: + return dict(self.cleaned_data) +``` + +That's it. The form base is intentionally thin — it's a contract marker (so admin actions know which type of form to render) plus one helper. Scope-aware kwargs (`events=`, `collection=`) are subclass-specific and don't belong on the base. + +## Confirmation Template + +```html +{# ami/templates/admin/post_processing/confirmation.html #} +{% extends "admin/base_site.html" %} +{% load i18n admin_urls %} + +{% block title %}{{ title }} | {{ site_title|default:_("Django site admin") }}{% endblock %} + +{% block breadcrumbs %} + +{% endblock %} + +{% block content %} +
+ {% csrf_token %} + + {% block intro %} +

You are about to run {{ task_label }} on + {{ selected_count }} selected + {{ model_meta.verbose_name }}{{ selected_count|pluralize }}.

+ {% endblock %} + +
+ {% translate "Parameters" %} + {% include "admin/post_processing/_form_fieldset.html" with form=form %} +
+ + {% for pk in selected_pks %} + + {% endfor %} + + + + +
+{% endblock %} +``` + +```html +{# ami/templates/admin/post_processing/_form_fieldset.html #} +{% for field in form %} +
+ {{ field.label_tag }} + {{ field }} + {% if field.help_text %}

{{ field.help_text }}

{% endif %} + {% for error in field.errors %}

{{ error }}

{% endfor %} +
+{% endfor %} +``` + +Per-task templates (e.g. tracking) extend the shell + override `{% block intro %}` for task-specific preamble. Small-size-filter uses bare shell. + +## Admin Action Rewrite + +```python +# ami/main/admin.py (sketch — only the changed action) +from ami.ml.post_processing.admin.small_size_filter_form import SmallSizeFilterActionForm +from ami.ml.post_processing.small_size_filter import SmallSizeFilterConfig + +@admin.action(description="Run Small Size Filter post-processing task (async)") +def run_small_size_filter(self, request, queryset): + if request.POST.get("confirm"): + form = SmallSizeFilterActionForm(request.POST) + if not form.is_valid(): + return _render_confirmation(request, queryset, form) + cfg = form.to_config() + jobs = [] + for collection in queryset: + try: + validated = SmallSizeFilterConfig( + **cfg, + source_image_collection_id=collection.pk, + ) + except pydantic.ValidationError as exc: + self.message_user(request, f"Bad config for collection {collection.pk}: {exc}", level="error") + continue + job = Job.objects.create( + name=f"Post-processing: SmallSizeFilter on Capture Set {collection.pk}", + project=collection.project, + job_type_key="post_processing", + params={"task": "small_size_filter", "config": validated.dict()}, + ) + job.enqueue() + jobs.append(job.pk) + self.message_user(request, f"Queued Small Size Filter for {len(jobs)} capture set(s). Jobs: {jobs}") + return None + + return _render_confirmation(request, queryset, SmallSizeFilterActionForm()) + + +def _render_confirmation(request, queryset, form): + return TemplateResponse( + request, + "admin/post_processing/confirmation.html", + { + **self.admin_site.each_context(request), + "title": "Run Small Size Filter", + "task_label": "Small Size Filter", + "form": form, + "selected_count": queryset.count(), + "selected_pks": [str(o.pk) for o in queryset], + "action_name": "run_small_size_filter", + "submit_label": "Run Small Size Filter", + "changelist_url": reverse("admin:main_sourceimagecollection_changelist"), + "model_meta": self.model._meta, + "opts": self.model._meta, + "action_checkbox_name": admin.helpers.ACTION_CHECKBOX_NAME, + }, + ) +``` + +`_render_confirmation` is a module-private helper near the action; not a class method on the admin site. If a future PR finds itself duplicating it across admins, lift it to `ami/ml/post_processing/admin/helpers.py` then. + +## Tests + +All four test files live under `ami/ml/post_processing/tests/`. New tests do not touch the existing `ami/ml/tests.py` file (which holds older post-processing smoke tests). + +**`test_base_schema.py`:** +- Subclassing without `config_schema` raises `TypeError` +- Bad config dict raises `pydantic.ValidationError` at task construction +- Valid config builds task, `task.config` is a `BaseModel` instance +- Unknown keys rejected (`extra="forbid"` semantics) + +**`test_admin_form.py`:** +- `BasePostProcessingActionForm.to_config()` returns dict matching `cleaned_data` +- `SmallSizeFilterActionForm` validates `size_threshold` in `(0, 1)` exclusive +- Form errors render in confirmation template (smoke render via Django test client) + +**`test_small_size_filter_admin.py`:** +- GET-equivalent (POST without `confirm`) renders intermediate page; no Job created +- POST with valid `confirm=yes` + `size_threshold=0.001` creates Job per collection with that threshold in `params['config']` +- POST with `size_threshold=2.0` re-renders form with error; no Job +- Multi-collection POST creates one Job per collection, each with correct project FK + +Existing `SmallSizeFilterTask` behavior tests (in `ami/ml/tests.py`, if any) should still pass — schema validation is additive, default value preserved. + +## Rebase Impact + +### PR #1272 (tracking) + +Net change: smaller diff. + +- `ami/ml/post_processing/admin_forms.py` → `ami/ml/post_processing/admin/tracking_form.py` (location move) +- `TrackingActionForm` extends `BasePostProcessingActionForm`, gains `to_config()` override that drops empty `feature_extraction_algorithm_id` +- `tracking_confirmation.html` extends new shell + overrides `{% block intro %}` for "you are about to run tracking on N events…" preamble +- New `TrackingConfig(pydantic.BaseModel)` schema replaces freeform dict; `tracking_task.py` reads typed `self.config` +- Admin actions in `ami/main/admin.py` reuse new template via `_render_confirmation` helper (or its lifted version) +- Per-project Job partitioning loop stays in #1272 (this PR doesn't ship the helper) + +Coordination: I (current author) own both #1272 and the precursor, so this rebase is internal. + +### PR #999 (class masking) + +Net change: smaller diff, but bigger lift than #1272 because #999 used hand-rolled HTML. + +- Hand-rolled `` HTML → `ClassMaskingActionForm(BasePostProcessingActionForm)` with `ModelChoiceField(queryset=TaxaList.objects.…)` + `ModelChoiceField(queryset=Algorithm.objects.…)` +- `class_masking_confirmation.html` becomes thin override of new shell with masking-specific intro/preview block +- New `ClassMaskingConfig` + `RankRollupConfig` schemas +- Admin action validates form + builds typed config via `to_config()` instead of pulling from `request.POST` directly + +Coordination: post-merge of precursor PR, message mohamedelabbas1996 in PR #999 with rebase guidance + concrete diff suggestions. Their existing rank-rollup work is unaffected; only the class-masking trigger needs reshaping. + +## Risks + +1. **Pydantic v1 vs v2 mismatch.** Container is v1 (per memory `MEMORY.md`: "Container uses Pydantic v1 — use `.dict()` / `.json()`, not `.model_dump()` / `.model_dump_json()`"). Spec uses v1 syntax throughout. CI runs in container, so v1 is enforced. + +2. **`__init_subclass__` strictness change.** Adding `config_schema` to required attrs breaks any out-of-tree subclass. Only in-tree consumers exist; check shows: `SmallSizeFilterTask` (will be migrated in this PR), and the `BasePostProcessingTask` referenced in #1272 + #999 (rebase territory). Acceptable. + +3. **Pydantic `BaseModel` in `Job.params['config']`.** Stored as dict via `validated.dict()`. JSONField round-trip is lossless for primitive-typed schemas. Risk: if a future schema uses `datetime` or non-JSON-native types, serialization needs explicit `.json()` → `json.loads(...)` round-trip. Out of scope for this PR (small-size-filter has only `int` + `float`). + +4. **Test runner uses `--keepdb`.** Existing `test_ami` DB has prior `SmallSizeFilterTask` migration. New tests don't add migrations. Should pass cleanly; verify with `docker compose -f docker-compose.ci.yml run --rm django python manage.py test ami.ml.post_processing.tests --keepdb`. + +5. **Form action POST vs GET ergonomics.** Django admin actions are POST-only. The "render confirmation page" leg uses POST without `confirm` flag. Existing pattern in #1272 + #999. No new risk. + +## Out of Scope (Future Work) + +- **Project-partitioning helper** (`enqueue_post_processing_jobs(queryset, task_cls, cleaned_data, scope_resolver)`). Belongs in whichever multi-scope adopter lands first (likely #1272, since tracking partitions; #999's masking is single-Occurrence-scoped per row). +- **REST API surface** for triggering post-processing from UI. Eventual replacement for admin trigger as primary surface. +- **Schema-driven form generation** (auto-build a `ModelForm`-style form from a pydantic schema). Tempting but premature; current task count = 1, second adopter has scope-aware dropdowns that don't fit auto-generation. +- **Job param schema versioning.** Once multiple post-processing tasks ship and config schemas evolve, a `schema_version` field on the schema may be needed for backward-compat with old Job rows. Defer until first breaking schema change. + +## Implementation Plan + +To be drafted by writing-plans skill after user approves this spec.