Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions client/pyroclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,19 @@ def create_detection(

Args:
media: byte data of the picture
bboxes: list of tuples where each tuple is a relative coordinate in order xmin, ymin, xmax, ymax, conf
bboxes: list of tuples where each tuple is a relative coordinate in order xmin, ymin, xmax, ymax, conf.
An empty list reports a frame with no detection: the API attaches it to recently
seen sequences of the pose (keeping their frame timeline continuous) and stores
nothing otherwise (204).
pose_id: pose_id of the detection
crops: optional list of cropped pictures, one per bbox (must align with `bboxes`).
Each crop frames a single object, so its length must equal that of `bboxes`.

Returns:
HTTP response
"""
if not isinstance(bboxes, (list, tuple)) or len(bboxes) == 0 or len(bboxes) > 5:
raise ValueError("bboxes must be a non-empty list of tuples with a maximum of 5 boxes")
if not isinstance(bboxes, (list, tuple)) or len(bboxes) > 5:
raise ValueError("bboxes must be a list of tuples with a maximum of 5 boxes")
if crops is not None and len(crops) != len(bboxes):
raise ValueError("crops must have the same length as bboxes")
data: Dict[str, str] = {
Expand Down
20 changes: 15 additions & 5 deletions client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ def test_cam_workflow(cam_token, cam_pose_id, mock_img):
assert response.status_code == 200, response.__dict__
assert isinstance(response.json()["last_image"], str)
# Check that adding bboxes works
with pytest.raises(ValueError, match="bboxes must be a non-empty list of tuples"):
with pytest.raises(ValueError, match="bboxes must be a list of tuples"):
cam_client.create_detection(mock_img, None, pose_id=cam_pose_id)
with pytest.raises(ValueError, match="bboxes must be a non-empty list of tuples"):
cam_client.create_detection(mock_img, [], pose_id=cam_pose_id)
# An empty frame with no recently-seen sequence is not stored
response = cam_client.create_detection(mock_img, [], pose_id=cam_pose_id)
assert response.status_code == 204, response.__dict__
response = cam_client.create_detection(mock_img, [(0, 0, 1.0, 0.9, 0.5)], pose_id=cam_pose_id)
assert response.status_code == 201, response.__dict__
response = cam_client.create_detection(
Expand All @@ -114,7 +115,13 @@ def test_cam_workflow(cam_token, cam_pose_id, mock_img):
pose_id=cam_pose_id,
crops=[mock_img],
)
return response.json()["id"]
detection_id = response.json()["id"]
# An empty frame extends the freshly created sequence with a continuity detection
response = cam_client.create_detection(mock_img, [], pose_id=cam_pose_id)
assert response.status_code == 201, response.__dict__
assert response.json()["bbox"] == "[]"
assert isinstance(response.json()["sequence_id"], int)
return detection_id


def test_agent_workflow(test_cam_workflow, agent_token):
Expand Down Expand Up @@ -155,4 +162,7 @@ def test_user_workflow(test_cam_workflow, user_token):
assert len(response.json()) == 1
response = user_client.fetch_sequences_detections(response.json()[0]["id"])
assert response.status_code == 200, response.__dict__
assert len(response.json()) == 4
# 4 real detections + the continuity row added by the empty frame in test_cam_workflow
detections = response.json()
assert len(detections) == 5
assert sum(det["bbox"] == "[]" for det in detections) == 1
105 changes: 86 additions & 19 deletions src/app/api/api_v1/endpoints/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
from ast import literal_eval
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast

import pandas as pd
from fastapi import (
Expand All @@ -18,6 +18,7 @@
Form,
HTTPException,
Path,
Response,
Security,
UploadFile,
status,
Expand All @@ -40,7 +41,7 @@
from app.schemas.detections import (
BOX_PATTERN,
BOXES_PATTERN,
COMPILED_BOXES_PATTERN,
EMPTY_BBOXES,
DetectionCreate,
DetectionRead,
DetectionSequence,
Expand Down Expand Up @@ -92,20 +93,52 @@ async def _get_last_bbox_for_sequence(
detections: DetectionCRUD,
sequence_id: int,
) -> Optional[Tuple[float, float, float, float, float]]:
dets = await detections.fetch_all(
filters=("sequence_id", sequence_id),
order_by="created_at",
order_desc=True,
limit=1,
)
if not dets:
# Continuity rows (empty bbox) are skipped: spatial matching must compare against the
# last real evidence, not against a frame where nothing was detected.
det = await detections.get_latest_with_bbox(sequence_id)
if det is None:
return None
bbox_strs = _extract_bbox_strings(dets[0].bbox)
bbox_strs = _extract_bbox_strings(det.bbox)
if not bbox_strs:
return None
return _parse_bbox(bbox_strs[0])


async def _get_continuity_sequences(
sequences: SequenceCRUD,
camera_id: int,
pose_id: int,
) -> List[Sequence]:
"""Sequences of the pose seen recently enough for an unmatched frame to be attached to them."""
return await sequences.fetch_all(
filters=[("camera_id", camera_id), ("pose_id", pose_id)],
inequality_pair=(
"last_seen_at",
">",
utcnow() - timedelta(seconds=settings.SEQUENCE_CONTINUITY_SECONDS),
),
)


async def _create_continuity_detection(
detections: DetectionCRUD,
camera_id: int,
pose_id: int,
bucket_key: str,
sequence_id: int,
) -> Detection:
"""Attach a frame to a sequence whose object was not detected on it (empty bbox).

Continuity rows keep the sequence's frame timeline gapless for the temporal model.
They never refresh last_seen_at nor max_conf: the sequence's lifetime and confidence
track real evidence only.
"""
det = await detections.create(
DetectionCreate(camera_id=camera_id, pose_id=pose_id, bucket_key=bucket_key, bbox=EMPTY_BBOXES)
)
return await detections.update(det.id, DetectionSequence(sequence_id=sequence_id))


async def _get_camera_by_id(
camera: Camera,
cameras: CameraCRUD,
Expand Down Expand Up @@ -340,7 +373,14 @@ async def _attach_sequence_to_alert(
return alert_id


@router.post("/", status_code=status.HTTP_201_CREATED, summary="Register a new wildfire detection")
@router.post(
"/",
status_code=status.HTTP_201_CREATED,
summary="Register a new wildfire detection",
# The return annotation is not a valid response-model type (a 204 Response is returned
# for an empty frame extending no sequence), so the model is declared explicitly.
response_model=DetectionRead,
)
async def create_detection(
bboxes: str = Form(
...,
Expand All @@ -357,11 +397,13 @@ async def create_detection(
cameras: CameraCRUD = Depends(get_camera_crud),
poses: PoseCRUD = Depends(get_pose_crud),
token_payload: TokenPayload = Security(get_jwt, scopes=[Role.CAMERA]),
) -> Detection:
) -> Union[Detection, Response]:
telemetry_client.capture(f"camera|{token_payload.sub}", event="detections-create")

# Throw an error if the format is invalid and can't be captured by the regex
if any(box[0] >= box[2] or box[1] >= box[3] for box in COMPILED_BOXES_PATTERN.findall(bboxes)):
# The Form regex already constrains the format; parse to validate coordinate ordering on
# every box (an empty list parses to no box at all and is a valid frame with no detection).
bbox_strings = _extract_bbox_strings(bboxes)
if any(box[0] >= box[2] or box[1] >= box[3] for box in map(_parse_bbox, bbox_strings)):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="xmin & ymin are expected to be respectively smaller than xmax & ymax",
Expand All @@ -374,10 +416,6 @@ async def create_detection(
if not pose.active:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Pose is not active.")

bbox_strings = _extract_bbox_strings(bboxes)
if not bbox_strings:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid bbox format.")

# Validate crop/bbox alignment before any S3 upload to avoid orphan objects.
# Each crop frames a single object, so there must be exactly one crop per bbox (or none at all).
crops = crop_files or []
Expand All @@ -387,6 +425,22 @@ async def create_detection(
detail="Number of crops must match the number of bboxes.",
)

# Frame with no detection: it only matters as continuity for recently-seen sequences of
# the pose. Without one, store nothing at all — empty frames must never seed a sequence
# (the historical placeholder bboxes did, creating phantom sequences).
if not bbox_strings:
continuity_sequences = await _get_continuity_sequences(sequences, token_payload.sub, pose_id)
if not continuity_sequences:
return Response(status_code=status.HTTP_204_NO_CONTENT)
bucket_key = await upload_file(file, token_payload.organization_id, token_payload.sub)
continuity_dets: List[Detection] = []
for seq in continuity_sequences:
continuity_dets.append(
await _create_continuity_detection(detections, token_payload.sub, pose_id, bucket_key, seq.id)
)
await sequences.enqueue_validation(seq.id)
return DetectionRead(**continuity_dets[0].model_dump())

# Upload media
bucket_key = await upload_file(file, token_payload.organization_id, token_payload.sub)
crop_bucket_keys: List[Optional[str]] = [None] * len(bbox_strings)
Expand Down Expand Up @@ -496,6 +550,15 @@ async def create_detection(

created.append(det)

# Continuity pass: a recently-seen sequence of this pose whose object was not detected on
# this frame (no bbox matched it, e.g. one of two smokes faded) still gets the frame,
# attached with an empty bbox, so its frame timeline stays gapless for the temporal model.
for seq in await _get_continuity_sequences(sequences, token_payload.sub, pose_id):
if seq.id in affected_sequences:
continue
await _create_continuity_detection(detections, token_payload.sub, pose_id, bucket_key, seq.id)
affected_sequences.add(seq.id)

# Mark touched sequences due for validation (idempotent: one queue entry per sequence,
# whichever uvicorn worker received the detection). The per-process validation worker
# claims due sequences from the DB and runs the gated pipeline: triangulation and ALL
Expand Down Expand Up @@ -576,7 +639,11 @@ async def delete_detection(
detection = cast(Detection, await detections.get(detection_id, strict=True))
camera = cast(Camera, await cameras.get(detection.camera_id, strict=True))
bucket = s3_service.get_bucket(s3_service.resolve_bucket_name(camera.organization_id))
bucket.delete_file(detection.bucket_key)
# The frame object is shared by every detection of the same upload (multi-bbox siblings,
# continuity rows): only delete it once no other row references it. Crops are per-row.
sharing = await detections.fetch_all(filters=("bucket_key", detection.bucket_key))
if all(d.id == detection_id for d in sharing):
bucket.delete_file(detection.bucket_key)
if detection.crop_bucket_key:
bucket.delete_file(detection.crop_bucket_key)
await detections.delete(detection_id)
3 changes: 3 additions & 0 deletions src/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def sqlachmey_uri(cls, v: str) -> str:
SEQUENCE_RELAXATION_SECONDS: int = int(os.environ.get("SEQUENCE_RELAXATION_SECONDS") or 120 * 60)
SEQUENCE_MIN_INTERVAL_DETS: int = int(os.environ.get("SEQUENCE_MIN_INTERVAL_DETS") or 3)
SEQUENCE_MIN_INTERVAL_SECONDS: int = int(os.environ.get("SEQUENCE_MIN_INTERVAL_SECONDS") or 5 * 60)
# Window after a sequence's last real detection during which a frame with no matching bbox
# is still attached to it (with an empty bbox) to keep the frame timeline continuous.
SEQUENCE_CONTINUITY_SECONDS: int = int(os.environ.get("SEQUENCE_CONTINUITY_SECONDS") or 2 * 60)
TRIANGULATION_RELAXATION_SECONDS: int = int(os.environ.get("TRIANGULATION_RELAXATION_SECONDS") or 30 * 60)
ALERT_MERGE_MAX_DISTANCE_KM: float = float(os.environ.get("ALERT_MERGE_MAX_DISTANCE_KM") or 2.0)

Expand Down
18 changes: 17 additions & 1 deletion src/app/crud/crud_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,31 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

from typing import Any, Union, cast

from sqlalchemy import desc
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

from app.crud.base import BaseCRUD
from app.models import Detection
from app.schemas.detections import DetectionCreate, DetectionSequence
from app.schemas.detections import EMPTY_BBOXES, DetectionCreate, DetectionSequence

__all__ = ["DetectionCRUD"]


class DetectionCRUD(BaseCRUD[Detection, DetectionCreate, DetectionSequence]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session, Detection)

async def get_latest_with_bbox(self, sequence_id: int) -> Union[Detection, None]:
"""Latest detection of the sequence carrying a real bbox (continuity rows excluded)."""
statement: Any = (
select(Detection)
.where(cast(Any, Detection.sequence_id) == sequence_id)
.where(cast(Any, Detection.bbox) != EMPTY_BBOXES)
.order_by(desc(cast(Any, Detection.created_at)))
.limit(1)
)
results = await self.session.exec(statement)
return results.first()
6 changes: 5 additions & 1 deletion src/app/schemas/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ class DetectionLabel(BaseModel):
# Regex for a float between 0 and 1, with a maximum of 3 decimals
FLOAT_PATTERN = r"(0?\.[0-9]{1,3}|0|1)"
BOX_PATTERN = rf"\({FLOAT_PATTERN},{FLOAT_PATTERN},{FLOAT_PATTERN},{FLOAT_PATTERN},{FLOAT_PATTERN}\)"
BOXES_PATTERN = rf"^\[{BOX_PATTERN}(,{BOX_PATTERN})*\]$"
# An empty list is valid: a frame with no detection (kept for sequence continuity).
BOXES_PATTERN = rf"^\[({BOX_PATTERN}(,{BOX_PATTERN})*)?\]$"
COMPILED_BOXES_PATTERN = re.compile(BOXES_PATTERN)

# Stored bbox of a continuity detection: a frame attached to a sequence with no detection on it.
EMPTY_BBOXES = "[]"


class DetectionCreate(BaseModel):
camera_id: int = Field(..., gt=0)
Expand Down
3 changes: 3 additions & 0 deletions src/app/services/sequence_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
from sqlmodel.ext.asyncio.session import AsyncSession

from app.models import Detection
from app.schemas.detections import EMPTY_BBOXES


async def get_detection_counts_by_sequence_ids(session: AsyncSession, sequence_ids: List[int]) -> Dict[int, int]:
if not sequence_ids:
return {}

# Continuity rows (empty bbox) carry a frame, not a detection: don't count them.
stmt: Any = (
select(cast(Any, Detection.sequence_id), func.count(cast(Any, Detection.id)))
.where(cast(Any, Detection.sequence_id).in_(sequence_ids))
.where(cast(Any, Detection.bbox) != EMPTY_BBOXES)
.group_by(cast(Any, Detection.sequence_id))
)
res = await session.exec(stmt)
Expand Down
8 changes: 3 additions & 5 deletions src/app/services/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,10 @@ async def _notify_for_sequence(sequence_id: int, organization_id: int, alert_id:
if sequence_ is None:
return
camera = await CameraCRUD(session).get(sequence_.camera_id)
dets = await detections.fetch_all(
filters=("sequence_id", sequence_id), order_by="created_at", order_desc=True, limit=1
)
if camera is None or not dets:
# Latest real detection: continuity rows (empty bbox) must never reach a channel.
det = await detections.get_latest_with_bbox(sequence_id)
if camera is None or det is None:
return
det = dets[0]
org = await OrganizationCRUD(session).get(organization_id)

for webhook in await WebhookCRUD(session).fetch_all():
Expand Down
Loading
Loading