Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
# State keys genuinely missing (the total-images key returned None).
# Ack so NATS stops redelivering and fail the job — there's no state
# left to reconcile against.
_log_missing_state_context(job_id, "process")
_ack_task_via_nats(reply_subject, logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
return
Expand Down Expand Up @@ -364,6 +365,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
# State keys genuinely missing (total-images key returned None). Ack
# first so NATS stops redelivering a message whose state is gone,
# then fail the job. Mirrors the stage=process missing-state path.
_log_missing_state_context(job_id, "results")
_ack_task_via_nats(reply_subject, job.logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
return
Expand Down Expand Up @@ -434,6 +436,46 @@ def _fail_job(job_id: int, reason: str) -> None:
cleanup_async_job_resources(job_id)


def _log_missing_state_context(job_id: int, stage: str) -> None:
# Diagnostic for a missing progress-state key. Status + age separate the two
# cases: a terminal job means a late result arriving after the job finished
# (benign — e.g. a cancel already cleaned up the state); a still-running job
# with no state is the one worth investigating (state never seeded, or wiped
# by a re-dispatched run). Observation only; the fix is planned in #1337.
from django.utils import timezone

from ami.jobs.models import Job, JobState # avoid circular import

try:
job_values = Job.objects.values("status", "created_at").get(pk=job_id)
except Job.DoesNotExist:
logger.warning("Job %s: progress state missing and the job no longer exists (stage=%s).", job_id, stage)
return

age = (timezone.now() - job_values["created_at"]).total_seconds() if job_values["created_at"] else None
age_s = round(age, 1) if age is not None else None

# Mirror _fail_job's no-op set: a job that is already terminal OR cancelling
# will not actually be failed, so a late result for it is expected cleanup,
# not an anomaly.
if job_values["status"] in {JobState.CANCELING, *JobState.final_states()}:
logger.info(
"Job %s: result arrived after the job already finished (status=%s, stage=%s); ignoring.",
job_id,
job_values["status"],
stage,
)
else:
logger.warning(
"Job %s: progress state missing while the job is still running "
"(status=%s, stage=%s, age=%ss); marking it failed.",
job_id,
job_values["status"],
stage,
age_s,
)


def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> bool:
"""
Acknowledge a NATS task. Returns True only when JetStream confirmed the ack.
Expand Down Expand Up @@ -653,6 +695,16 @@ def _update_job_progress(
# Advance summary.status only when the transition fired, else an
# already-terminal job's JSONB would disagree with its column.
job.progress.summary.status = complete_state
else:
# Work completed but the guard found the job already terminal/CANCELING,
# so the completion was not applied. Usually legitimate (a cancel or the
# reaper won the race); if frequent it points to a premature terminal
# verdict. Observation only; see #1337.
job.logger.warning(
"Stage '%s' completed but the job was already in a terminal state; not applying %s.",
stage,
complete_state,
)

# status/finished_at are deliberately NOT in this save() — only the
# guarded UPDATE above writes them. Folding them back in reopens #1337.
Expand Down
Loading