diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 181b45301..dbf57fa38 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -282,6 +282,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # State keys genuinely missing (the total-images key returned None). # Ack so NATS stops redelivering and fail the job — there's no state # left to reconcile against. + _log_missing_state_context(job_id, "process") _ack_task_via_nats(reply_subject, logger) _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") return @@ -364,6 +365,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # State keys genuinely missing (total-images key returned None). Ack # first so NATS stops redelivering a message whose state is gone, # then fail the job. Mirrors the stage=process missing-state path. + _log_missing_state_context(job_id, "results") _ack_task_via_nats(reply_subject, job.logger) _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") return @@ -434,6 +436,46 @@ def _fail_job(job_id: int, reason: str) -> None: cleanup_async_job_resources(job_id) +def _log_missing_state_context(job_id: int, stage: str) -> None: + # Diagnostic for a missing progress-state key. Status + age separate the two + # cases: a terminal job means a late result arriving after the job finished + # (benign — e.g. a cancel already cleaned up the state); a still-running job + # with no state is the one worth investigating (state never seeded, or wiped + # by a re-dispatched run). Observation only; the fix is planned in #1337. + from django.utils import timezone + + from ami.jobs.models import Job, JobState # avoid circular import + + try: + job_values = Job.objects.values("status", "created_at").get(pk=job_id) + except Job.DoesNotExist: + logger.warning("Job %s: progress state missing and the job no longer exists (stage=%s).", job_id, stage) + return + + age = (timezone.now() - job_values["created_at"]).total_seconds() if job_values["created_at"] else None + age_s = round(age, 1) if age is not None else None + + # Mirror _fail_job's no-op set: a job that is already terminal OR cancelling + # will not actually be failed, so a late result for it is expected cleanup, + # not an anomaly. + if job_values["status"] in {JobState.CANCELING, *JobState.final_states()}: + logger.info( + "Job %s: result arrived after the job already finished (status=%s, stage=%s); ignoring.", + job_id, + job_values["status"], + stage, + ) + else: + logger.warning( + "Job %s: progress state missing while the job is still running " + "(status=%s, stage=%s, age=%ss); marking it failed.", + job_id, + job_values["status"], + stage, + age_s, + ) + + def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> bool: """ Acknowledge a NATS task. Returns True only when JetStream confirmed the ack. @@ -653,6 +695,16 @@ def _update_job_progress( # Advance summary.status only when the transition fired, else an # already-terminal job's JSONB would disagree with its column. job.progress.summary.status = complete_state + else: + # Work completed but the guard found the job already terminal/CANCELING, + # so the completion was not applied. Usually legitimate (a cancel or the + # reaper won the race); if frequent it points to a premature terminal + # verdict. Observation only; see #1337. + job.logger.warning( + "Stage '%s' completed but the job was already in a terminal state; not applying %s.", + stage, + complete_state, + ) # status/finished_at are deliberately NOT in this save() — only the # guarded UPDATE above writes them. Folding them back in reopens #1337.