Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion backend/packages/harness/deerflow/runtime/runs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,39 @@
"""Run lifecycle management for LangGraph Platform API compatibility."""

from .domain import (
AssistantId,
CancelAction,
DisconnectMode,
EventSeq,
InvalidRunTransition,
MultitaskStrategy,
Run,
RunId,
RunScope,
RunStatus,
ThreadId,
UserId,
)
from .manager import ConflictError, RunManager, RunRecord, UnsupportedStrategyError
from .schemas import DisconnectMode, RunStatus
from .worker import RunContext, run_agent

__all__ = [
"AssistantId",
"CancelAction",
"ConflictError",
"DisconnectMode",
"EventSeq",
"InvalidRunTransition",
"MultitaskStrategy",
"Run",
"RunContext",
"RunId",
"RunManager",
"RunRecord",
"RunScope",
"RunStatus",
"ThreadId",
"UnsupportedStrategyError",
"UserId",
"run_agent",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Application-layer DTOs and services for run runtime use cases."""

from .commands import CancelRunCommand, CreateRunCommand, JoinRunStreamCommand
from .dto import RunMessageView, RunSnapshot, RunStreamHandle, StoredRunEvent
from .queries import GetRunQuery, ListRunMessagesQuery, ListRunsQuery
from .services import RunsApplicationService

__all__ = [
"CancelRunCommand",
"CreateRunCommand",
"GetRunQuery",
"JoinRunStreamCommand",
"ListRunMessagesQuery",
"ListRunsQuery",
"RunMessageView",
"RunSnapshot",
"RunStreamHandle",
"RunsApplicationService",
"StoredRunEvent",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Application command DTOs for run use cases."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Literal

from ..domain import AssistantId, CancelAction, DisconnectMode, MultitaskStrategy, RunId, RunScope, ThreadId


@dataclass(frozen=True)
class CreateRunCommand:
thread_id: ThreadId
assistant_id: AssistantId | None = None
input: dict[str, Any] | None = None
command: dict[str, Any] | None = None
metadata: dict[str, Any] = field(default_factory=dict)
config: dict[str, Any] = field(default_factory=dict)
context: dict[str, Any] = field(default_factory=dict)
scope: RunScope = RunScope.stateful
on_disconnect: DisconnectMode = DisconnectMode.cancel
multitask_strategy: MultitaskStrategy = MultitaskStrategy.reject
stream_mode: list[str] | str | None = None
stream_subgraphs: bool = False
interrupt_before: list[str] | Literal["*"] | None = None
interrupt_after: list[str] | Literal["*"] | None = None


@dataclass(frozen=True)
class CancelRunCommand:
run_id: RunId
action: CancelAction = CancelAction.interrupt
wait: bool = False


@dataclass(frozen=True)
class JoinRunStreamCommand:
run_id: RunId
last_event_id: str | None = None


__all__ = [
"CancelRunCommand",
"CreateRunCommand",
"JoinRunStreamCommand",
]
76 changes: 76 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/application/dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Application output DTOs for run use cases."""

from __future__ import annotations

from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import Any

from ..domain import AssistantId, EventSeq, Run, RunId, RunStatus, ThreadId


@dataclass(frozen=True)
class RunSnapshot:
run_id: RunId
thread_id: ThreadId
assistant_id: AssistantId | None = None
status: RunStatus = RunStatus.pending
metadata: dict[str, Any] = field(default_factory=dict)
kwargs: dict[str, Any] = field(default_factory=dict)
created_at: str = ""
updated_at: str = ""
error: str | None = None
model_name: str | None = None

@classmethod
def from_run(cls, run: Run) -> RunSnapshot:
return cls(
run_id=run.run_id,
thread_id=run.thread_id,
assistant_id=run.assistant_id,
status=run.status,
metadata=dict(run.metadata),
kwargs=dict(run.kwargs),
created_at=run.created_at,
updated_at=run.updated_at,
error=run.error,
model_name=run.model_name,
)


@dataclass(frozen=True)
class RunMessageView:
thread_id: ThreadId
run_id: RunId
seq: EventSeq
event_type: str
content: str | dict[str, Any] = ""
metadata: dict[str, Any] = field(default_factory=dict)
created_at: str = ""


@dataclass(frozen=True)
class StoredRunEvent:
thread_id: ThreadId
run_id: RunId
seq: EventSeq
event_type: str
category: str
content: str | dict[str, Any] = ""
metadata: dict[str, Any] = field(default_factory=dict)
created_at: str = ""


@dataclass(frozen=True)
class RunStreamHandle:
run_id: RunId
thread_id: ThreadId
events: AsyncIterator[Any]


__all__ = [
"RunMessageView",
"RunSnapshot",
"RunStreamHandle",
"StoredRunEvent",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Application query DTOs for run use cases."""

from __future__ import annotations

from dataclasses import dataclass

from ..domain import RunId, ThreadId, UserId


@dataclass(frozen=True)
class GetRunQuery:
run_id: RunId
thread_id: ThreadId | None = None
user_id: UserId | None = None


@dataclass(frozen=True)
class ListRunsQuery:
thread_id: ThreadId
user_id: UserId | None = None
limit: int = 100


@dataclass(frozen=True)
class ListRunMessagesQuery:
thread_id: ThreadId
run_id: RunId
limit: int = 50
before_seq: int | None = None
after_seq: int | None = None


__all__ = [
"GetRunQuery",
"ListRunMessagesQuery",
"ListRunsQuery",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Application service skeleton for run use cases."""

from __future__ import annotations

from dataclasses import dataclass

from ..execution import RunExecutionScheduler, RunSupervisor
from ..repositories import RunEventLog, RunRepository
from ..streams import RunStreamBroker
from .commands import CancelRunCommand, CreateRunCommand, JoinRunStreamCommand
from .dto import RunMessageView, RunSnapshot, RunStreamHandle
from .queries import GetRunQuery, ListRunMessagesQuery, ListRunsQuery


@dataclass
class RunsApplicationService:
"""Use-case orchestration boundary for run runtime operations.

PR1 only introduces the boundary and dependency shape. Existing Gateway
handlers continue to call the legacy service functions until later PRs move
behavior into this class.
"""

run_repository: RunRepository
run_event_log: RunEventLog
stream_broker: RunStreamBroker
scheduler: RunExecutionScheduler
supervisor: RunSupervisor

async def create_background(self, command: CreateRunCommand) -> RunSnapshot:
# PR1 defines the application boundary; later PRs move Gateway runtime
# behavior behind this method.
raise NotImplementedError("RunsApplicationService is not wired in PR1")

async def create_and_stream(self, command: CreateRunCommand) -> RunStreamHandle:
raise NotImplementedError("RunsApplicationService is not wired in PR1")

async def create_and_wait(self, command: CreateRunCommand) -> RunSnapshot:
raise NotImplementedError("RunsApplicationService is not wired in PR1")

async def join_stream(self, command: JoinRunStreamCommand) -> RunStreamHandle:
raise NotImplementedError("RunsApplicationService is not wired in PR1")

async def cancel(self, command: CancelRunCommand) -> bool:
return await self.supervisor.cancel(command.run_id, action=command.action)

async def get_run(self, query: GetRunQuery) -> RunSnapshot | None:
run = await self.run_repository.get(query.run_id, user_id=query.user_id)
if run is None:
return None
if query.thread_id is not None and run.thread_id != query.thread_id:
return None
return RunSnapshot.from_run(run)

async def list_runs(self, query: ListRunsQuery) -> list[RunSnapshot]:
return await self.run_repository.list_by_thread(
query.thread_id,
user_id=query.user_id,
limit=query.limit,
)

async def list_run_messages(self, query: ListRunMessagesQuery) -> list[RunMessageView]:
return await self.run_event_log.list_messages_by_run(
query.thread_id,
query.run_id,
limit=query.limit,
before_seq=query.before_seq,
after_seq=query.after_seq,
)


__all__ = [
"RunsApplicationService",
]
33 changes: 33 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/domain/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Run runtime domain model."""

from .errors import InvalidRunTransition, RunDomainError
from .events import RunCancelled, RunCompleted, RunCreated, RunEvent, RunFailed, RunStarted
from .identifiers import AssistantId, RunId, ThreadId, UserId
from .model import Run
from .policies import CancelPolicy, MultitaskDecision, MultitaskPolicy
from .value_objects import CancelAction, DisconnectMode, EventSeq, MultitaskStrategy, RunScope, RunStatus

__all__ = [
"AssistantId",
"CancelAction",
"CancelPolicy",
"DisconnectMode",
"EventSeq",
"InvalidRunTransition",
"MultitaskDecision",
"MultitaskPolicy",
"MultitaskStrategy",
"Run",
"RunCancelled",
"RunCompleted",
"RunCreated",
"RunDomainError",
"RunEvent",
"RunFailed",
"RunId",
"RunScope",
"RunStarted",
"RunStatus",
"ThreadId",
"UserId",
]
24 changes: 24 additions & 0 deletions backend/packages/harness/deerflow/runtime/runs/domain/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Domain-level errors for run lifecycle operations."""

from __future__ import annotations

from .value_objects import RunStatus


class RunDomainError(Exception):
"""Base class for run runtime domain errors."""


class InvalidRunTransition(RunDomainError):
"""Raised when a run status transition violates lifecycle rules."""

def __init__(self, current: RunStatus, target: RunStatus) -> None:
super().__init__(f"Cannot transition run from {current.value!r} to {target.value!r}")
self.current = current
self.target = target


__all__ = [
"InvalidRunTransition",
"RunDomainError",
]
Loading
Loading