From 6364294795ec89de9a5d93765dd8b9d49ab1df01 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 18 Jun 2026 23:59:24 -0700 Subject: [PATCH 1/4] feat(jobs): log premature-terminal and missing-state failures for diagnosis MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Observation-only follow-up to #1338/#1342. Now that terminal status transitions are irreversible, surface the two cases where a terminal verdict may have been wrong, instead of letting them disappear silently: 1. When work completes for a job the guard finds already terminal/CANCELING, log a warning. Often legitimate (cancel/reaper won the race) but, if frequent, the signal of a premature terminal verdict. 2. When a result is failed because the job's Redis state is missing, log the job age/status/dispatch first. A small age points to a not-yet-seeded or redelivered-run_job race rather than genuine cleanup. No behaviour change — both warnings sit on existing code paths. Lets us confirm the trigger before adding grace/idempotency logic (see PR body follow-up). Refs #1337, #1219, #1324. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 181b45301..b80d30826 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -282,6 +282,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # State keys genuinely missing (the total-images key returned None). # Ack so NATS stops redelivering and fail the job — there's no state # left to reconcile against. + _log_missing_state_context(job_id, "process") _ack_task_via_nats(reply_subject, logger) _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") return @@ -364,6 +365,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # State keys genuinely missing (total-images key returned None). Ack # first so NATS stops redelivering a message whose state is gone, # then fail the job. Mirrors the stage=process missing-state path. + _log_missing_state_context(job_id, "results") _ack_task_via_nats(reply_subject, job.logger) _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") return @@ -434,6 +436,35 @@ def _fail_job(job_id: int, reason: str) -> None: cleanup_async_job_resources(job_id) +def _log_missing_state_context(job_id: int, stage: str) -> None: + """Log context when a result arrives but the job's Redis state is absent. + + A missing total-images key is treated as fatal by the result handler, but it + conflates three cases: genuinely cleaned up (end of life), never seeded yet + (startup race), and wiped by a duplicate/redelivered run_job re-running + initialize_job. Record the job's age and status so we can tell which one is + actually happening before adding grace logic. Observation only; see #1337. + """ + from django.utils import timezone + + from ami.jobs.models import Job # avoid circular import + + try: + row = Job.objects.values("status", "dispatch_mode", "created_at").get(pk=job_id) + age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None + logger.warning( + "Missing Redis state for job %s (stage=%s): status=%s dispatch=%s age_s=%s. Failing job. " + "A small age points to a not-yet-seeded or redispatch race rather than genuine cleanup. See #1337.", + job_id, + stage, + row["status"], + row["dispatch_mode"], + round(age, 1) if age is not None else None, + ) + except Job.DoesNotExist: + logger.warning("Missing Redis state for job %s (stage=%s) and the job row is gone. See #1337.", job_id, stage) + + def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> bool: """ Acknowledge a NATS task. Returns True only when JetStream confirmed the ack. @@ -653,6 +684,21 @@ def _update_job_progress( # Advance summary.status only when the transition fired, else an # already-terminal job's JSONB would disagree with its column. job.progress.summary.status = complete_state + else: + # The work completed but the guard found the job already terminal/ + # CANCELING, so the completion was not applied. Often legitimate (a + # user cancel or the reaper genuinely won the race), but a frequent + # occurrence signals a PREMATURE terminal verdict — e.g. the reaper + # revoked a slow-but-alive job and its results then landed. Logged so + # these false-positive terminals are visible instead of silently + # swallowed by the guard. Observation only; see #1337. + current = Job.objects.filter(pk=job_id).values_list("status", flat=True).first() + logger.warning( + "Job %s completed work after it was already terminal (status=%s); " + "completion not applied. If frequent, the terminal verdict may be premature. See #1337.", + job_id, + current, + ) # status/finished_at are deliberately NOT in this save() — only the # guarded UPDATE above writes them. Folding them back in reopens #1337. From 0f52c1707aaaafa7340c7aa3750003a47ac6fb16 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 19 Jun 2026 00:40:13 -0700 Subject: [PATCH 2/4] fix(jobs): downgrade missing-state log to info when the job is already terminal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The missing-state diagnostic logged a WARNING saying 'Failing job' for every in-flight result that arrived after a job finished — but _fail_job no-ops on a terminal job, so after a cancel (which deletes the Redis state) this fired once per in-flight batch and misdescribed normal cleanup as a failure. Now: a terminal job logs at info ('ignoring in-flight result for already-terminal job'); only a NON-terminal job with missing state logs the warning, which is the case actually worth investigating. Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index b80d30826..d950b35b8 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -447,22 +447,43 @@ def _log_missing_state_context(job_id: int, stage: str) -> None: """ from django.utils import timezone - from ami.jobs.models import Job # avoid circular import + from ami.jobs.models import Job, JobState # avoid circular import try: row = Job.objects.values("status", "dispatch_mode", "created_at").get(pk=job_id) - age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None + except Job.DoesNotExist: + logger.warning("Missing Redis state for job %s (stage=%s) and the job row is gone. See #1337.", job_id, stage) + return + + age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None + age_s = round(age, 1) if age is not None else None + + if row["status"] in JobState.final_states(): + # Expected: an in-flight result arriving after the job already finished + # (e.g. a cancel deleted the Redis state). The _fail_job call below no-ops + # on a terminal job, so this is normal post-terminal cleanup, not an + # anomaly — info, not warning. + logger.info( + "Ignoring in-flight result for already-terminal job %s (stage=%s, status=%s, age_s=%s). See #1337.", + job_id, + stage, + row["status"], + age_s, + ) + else: + # The job is NOT terminal but its state is gone — this is the case worth + # investigating. A small age points to a not-yet-seeded or redispatch + # race rather than genuine cleanup. logger.warning( - "Missing Redis state for job %s (stage=%s): status=%s dispatch=%s age_s=%s. Failing job. " - "A small age points to a not-yet-seeded or redispatch race rather than genuine cleanup. See #1337.", + "Missing Redis state for non-terminal job %s (stage=%s): status=%s dispatch=%s age_s=%s. " + "Failing job. A small age points to a not-yet-seeded or redispatch race rather than genuine cleanup. " + "See #1337.", job_id, stage, row["status"], row["dispatch_mode"], - round(age, 1) if age is not None else None, + age_s, ) - except Job.DoesNotExist: - logger.warning("Missing Redis state for job %s (stage=%s) and the job row is gone. See #1337.", job_id, stage) def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> bool: From bc8652d9f3e4729db9fe3ac857769b1d5077899f Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 19 Jun 2026 14:36:51 -0700 Subject: [PATCH 3/4] refactor(jobs): make the diagnostic log lines operator-readable and leaner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The missing-state and completed-after-terminal logs read like insider notes — ticket numbers and race-theory in the runtime message. Move the rationale and the issue reference into code comments and make the log lines plain operational statements an operator can act on without chasing a ticket. Also drop the redundant dispatch_mode field and the extra status re-query. Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 53 +++++++++++++++-------------------------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index d950b35b8..8c7969140 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -437,51 +437,38 @@ def _fail_job(job_id: int, reason: str) -> None: def _log_missing_state_context(job_id: int, stage: str) -> None: - """Log context when a result arrives but the job's Redis state is absent. - - A missing total-images key is treated as fatal by the result handler, but it - conflates three cases: genuinely cleaned up (end of life), never seeded yet - (startup race), and wiped by a duplicate/redelivered run_job re-running - initialize_job. Record the job's age and status so we can tell which one is - actually happening before adding grace logic. Observation only; see #1337. - """ + # Diagnostic for a missing progress-state key. Status + age separate the two + # cases: a terminal job means a late result arriving after the job finished + # (benign — e.g. a cancel already cleaned up the state); a still-running job + # with no state is the one worth investigating (state never seeded, or wiped + # by a re-dispatched run). Observation only; the fix is planned in #1337. from django.utils import timezone from ami.jobs.models import Job, JobState # avoid circular import try: - row = Job.objects.values("status", "dispatch_mode", "created_at").get(pk=job_id) + row = Job.objects.values("status", "created_at").get(pk=job_id) except Job.DoesNotExist: - logger.warning("Missing Redis state for job %s (stage=%s) and the job row is gone. See #1337.", job_id, stage) + logger.warning("Job %s: progress state missing and the job no longer exists (stage=%s).", job_id, stage) return age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None age_s = round(age, 1) if age is not None else None if row["status"] in JobState.final_states(): - # Expected: an in-flight result arriving after the job already finished - # (e.g. a cancel deleted the Redis state). The _fail_job call below no-ops - # on a terminal job, so this is normal post-terminal cleanup, not an - # anomaly — info, not warning. logger.info( - "Ignoring in-flight result for already-terminal job %s (stage=%s, status=%s, age_s=%s). See #1337.", + "Job %s: result arrived after the job already finished (status=%s, stage=%s); ignoring.", job_id, - stage, row["status"], - age_s, + stage, ) else: - # The job is NOT terminal but its state is gone — this is the case worth - # investigating. A small age points to a not-yet-seeded or redispatch - # race rather than genuine cleanup. logger.warning( - "Missing Redis state for non-terminal job %s (stage=%s): status=%s dispatch=%s age_s=%s. " - "Failing job. A small age points to a not-yet-seeded or redispatch race rather than genuine cleanup. " - "See #1337.", + "Job %s: progress state missing while the job is still running " + "(status=%s, stage=%s, age=%ss); marking it failed.", job_id, - stage, row["status"], - row["dispatch_mode"], + stage, age_s, ) @@ -706,19 +693,13 @@ def _update_job_progress( # already-terminal job's JSONB would disagree with its column. job.progress.summary.status = complete_state else: - # The work completed but the guard found the job already terminal/ - # CANCELING, so the completion was not applied. Often legitimate (a - # user cancel or the reaper genuinely won the race), but a frequent - # occurrence signals a PREMATURE terminal verdict — e.g. the reaper - # revoked a slow-but-alive job and its results then landed. Logged so - # these false-positive terminals are visible instead of silently - # swallowed by the guard. Observation only; see #1337. - current = Job.objects.filter(pk=job_id).values_list("status", flat=True).first() + # Work completed but the guard found the job already terminal/CANCELING, + # so the completion was not applied. Usually legitimate (a cancel or the + # reaper won the race); if frequent it points to a premature terminal + # verdict. Observation only; see #1337. logger.warning( - "Job %s completed work after it was already terminal (status=%s); " - "completion not applied. If frequent, the terminal verdict may be premature. See #1337.", + "Job %s: work completed but the job was already in a terminal state; completion not applied.", job_id, - current, ) # status/finished_at are deliberately NOT in this save() — only the From 7b848cb9eb770ed63939ccd8aa750aab358cbf47 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 19 Jun 2026 14:47:16 -0700 Subject: [PATCH 4/4] refactor(jobs): address review on missing-state diagnostics - Treat CANCELING as terminal-like in the missing-state classification so a cancel-in-flight result logs the benign info line instead of the misleading 'still running / marking it failed' warning (matches _fail_job's no-op set). Caught by CodeRabbit and Copilot. - Rename the values() dict from 'row' to 'job_values' (per review). - Log the completed-after-terminal case via job.logger and include the stage and attempted terminal state, without an extra status re-query (per Copilot). Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 8c7969140..dbf57fa38 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -447,19 +447,22 @@ def _log_missing_state_context(job_id: int, stage: str) -> None: from ami.jobs.models import Job, JobState # avoid circular import try: - row = Job.objects.values("status", "created_at").get(pk=job_id) + job_values = Job.objects.values("status", "created_at").get(pk=job_id) except Job.DoesNotExist: logger.warning("Job %s: progress state missing and the job no longer exists (stage=%s).", job_id, stage) return - age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None + age = (timezone.now() - job_values["created_at"]).total_seconds() if job_values["created_at"] else None age_s = round(age, 1) if age is not None else None - if row["status"] in JobState.final_states(): + # Mirror _fail_job's no-op set: a job that is already terminal OR cancelling + # will not actually be failed, so a late result for it is expected cleanup, + # not an anomaly. + if job_values["status"] in {JobState.CANCELING, *JobState.final_states()}: logger.info( "Job %s: result arrived after the job already finished (status=%s, stage=%s); ignoring.", job_id, - row["status"], + job_values["status"], stage, ) else: @@ -467,7 +470,7 @@ def _log_missing_state_context(job_id: int, stage: str) -> None: "Job %s: progress state missing while the job is still running " "(status=%s, stage=%s, age=%ss); marking it failed.", job_id, - row["status"], + job_values["status"], stage, age_s, ) @@ -697,9 +700,10 @@ def _update_job_progress( # so the completion was not applied. Usually legitimate (a cancel or the # reaper won the race); if frequent it points to a premature terminal # verdict. Observation only; see #1337. - logger.warning( - "Job %s: work completed but the job was already in a terminal state; completion not applied.", - job_id, + job.logger.warning( + "Stage '%s' completed but the job was already in a terminal state; not applying %s.", + stage, + complete_state, ) # status/finished_at are deliberately NOT in this save() — only the