-
Notifications
You must be signed in to change notification settings - Fork 9.6k
feat(workflows): add JSON output for workflow run resume and status #2814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
94f33e6
60f36d4
abfa237
ea95653
9575f2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |
| specify init --here | ||
| """ | ||
|
|
||
| import contextlib | ||
| import os | ||
| import sys | ||
| import zipfile | ||
|
|
@@ -2733,19 +2734,76 @@ def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]: | |
| return inputs | ||
|
|
||
|
|
||
| def _workflow_run_payload(state: Any) -> dict[str, Any]: | ||
| """Machine-readable summary of a run/resume outcome.""" | ||
| return { | ||
| "run_id": state.run_id, | ||
| "workflow_id": state.workflow_id, | ||
| "status": state.status.value, | ||
| "current_step_id": state.current_step_id, | ||
| "current_step_index": state.current_step_index, | ||
| } | ||
|
|
||
|
|
||
| def _emit_workflow_json(payload: dict[str, Any]) -> None: | ||
| """Write a workflow payload as machine-readable JSON to stdout. | ||
|
|
||
| Uses the builtin ``print`` rather than ``console.print`` so Rich | ||
| markup interpretation, syntax highlighting, and line-wrapping can | ||
| never alter the emitted JSON. | ||
| """ | ||
| print(json.dumps(payload, indent=2)) | ||
|
|
||
|
|
||
| @contextlib.contextmanager | ||
| def _stdout_to_stderr_when(active: bool): | ||
| """Redirect everything written to stdout onto stderr while *active*. | ||
|
|
||
| Suppressing the banner and the step-start callback is not enough to | ||
| keep a ``--json`` stream clean: individual steps may still write to | ||
| stdout while the engine runs — the gate step prints its prompt, | ||
| and the prompt step runs a subprocess that inherits the process's | ||
| stdout file descriptor. Either would corrupt the single JSON object. | ||
|
|
||
| Redirecting at the file-descriptor level (``dup2``) captures both | ||
| Python-level writes and inherited-fd subprocess output, so step | ||
| progress lands on stderr (still visible to a human) while stdout | ||
| carries only the emitted JSON. A no-op when *active* is false. | ||
| """ | ||
| if not active: | ||
| yield | ||
| return | ||
| sys.stdout.flush() | ||
| saved_stdout_fd = os.dup(1) | ||
| try: | ||
| os.dup2(2, 1) # fd 1 (stdout) now points at fd 2 (stderr) | ||
| with contextlib.redirect_stdout(sys.stderr): | ||
| yield | ||
| finally: | ||
| sys.stdout.flush() | ||
| os.dup2(saved_stdout_fd, 1) # restore the real stdout | ||
| os.close(saved_stdout_fd) | ||
|
|
||
|
|
||
| @workflow_app.command("run") | ||
| def workflow_run( | ||
| source: str = typer.Argument(..., help="Workflow ID or YAML file path"), | ||
| input_values: list[str] | None = typer.Option( | ||
| None, "--input", "-i", help="Input values as key=value pairs" | ||
| ), | ||
| json_output: bool = typer.Option( | ||
| False, | ||
| "--json", | ||
| help="Emit the run outcome as a single JSON object instead of formatted text.", | ||
| ), | ||
| ): | ||
| """Run a workflow from an installed ID or local YAML path.""" | ||
| from .workflows.engine import WorkflowEngine | ||
|
|
||
| project_root = _require_specify_project() | ||
| engine = WorkflowEngine(project_root) | ||
| engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") | ||
| if not json_output: | ||
| engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch — fixed in abfa237. While |
||
|
|
||
| try: | ||
| definition = engine.load_workflow(source) | ||
|
|
@@ -2767,18 +2825,24 @@ def workflow_run( | |
| # Parse inputs | ||
| inputs = _parse_input_values(input_values) | ||
|
|
||
| console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") | ||
| console.print(f"[dim]Version: {definition.version}[/dim]\n") | ||
| if not json_output: | ||
| console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") | ||
| console.print(f"[dim]Version: {definition.version}[/dim]\n") | ||
|
|
||
| try: | ||
| state = engine.execute(definition, inputs) | ||
| with _stdout_to_stderr_when(json_output): | ||
| state = engine.execute(definition, inputs) | ||
| except ValueError as exc: | ||
| console.print(f"[red]Error:[/red] {exc}") | ||
| raise typer.Exit(1) | ||
| except Exception as exc: | ||
| console.print(f"[red]Workflow failed:[/red] {exc}") | ||
| raise typer.Exit(1) | ||
|
|
||
| if json_output: | ||
| _emit_workflow_json(_workflow_run_payload(state)) | ||
| return | ||
|
|
||
| status_colors = { | ||
| "completed": "green", | ||
| "paused": "yellow", | ||
|
|
@@ -2799,18 +2863,25 @@ def workflow_resume( | |
| input_values: list[str] | None = typer.Option( | ||
| None, "--input", "-i", help="Updated input values as key=value pairs" | ||
| ), | ||
| json_output: bool = typer.Option( | ||
| False, | ||
| "--json", | ||
| help="Emit the resume outcome as a single JSON object instead of formatted text.", | ||
| ), | ||
| ): | ||
| """Resume a paused or failed workflow run.""" | ||
| from .workflows.engine import WorkflowEngine | ||
|
|
||
| project_root = _require_specify_project() | ||
| engine = WorkflowEngine(project_root) | ||
| engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") | ||
| if not json_output: | ||
| engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") | ||
|
|
||
| inputs = _parse_input_values(input_values) | ||
|
|
||
| try: | ||
| state = engine.resume(run_id, inputs or None) | ||
| with _stdout_to_stderr_when(json_output): | ||
| state = engine.resume(run_id, inputs or None) | ||
| except FileNotFoundError: | ||
| console.print(f"[red]Error:[/red] Run not found: {run_id}") | ||
| raise typer.Exit(1) | ||
|
|
@@ -2821,6 +2892,10 @@ def workflow_resume( | |
| console.print(f"[red]Resume failed:[/red] {exc}") | ||
| raise typer.Exit(1) | ||
|
|
||
| if json_output: | ||
| _emit_workflow_json(_workflow_run_payload(state)) | ||
| return | ||
|
|
||
| status_colors = { | ||
| "completed": "green", | ||
| "paused": "yellow", | ||
|
|
@@ -2834,6 +2909,11 @@ def workflow_resume( | |
| @workflow_app.command("status") | ||
| def workflow_status( | ||
| run_id: str | None = typer.Argument(None, help="Run ID to inspect (shows all if omitted)"), | ||
| json_output: bool = typer.Option( | ||
| False, | ||
| "--json", | ||
| help="Emit run status as a single JSON object instead of formatted text.", | ||
| ), | ||
| ): | ||
| """Show workflow run status.""" | ||
| from .workflows.engine import WorkflowEngine | ||
|
|
@@ -2849,6 +2929,21 @@ def workflow_status( | |
| console.print(f"[red]Error:[/red] Run not found: {run_id}") | ||
| raise typer.Exit(1) | ||
|
|
||
| if json_output: | ||
| # Build on the shared run/resume payload so the common fields | ||
| # (including current_step_index) stay identical across commands. | ||
| payload = { | ||
| **_workflow_run_payload(state), | ||
| "created_at": state.created_at, | ||
| "updated_at": state.updated_at, | ||
| "steps": { | ||
| sid: sd.get("status", "unknown") | ||
| for sid, sd in state.step_results.items() | ||
| }, | ||
| } | ||
| _emit_workflow_json(payload) | ||
| return | ||
|
|
||
| status_colors = { | ||
| "completed": "green", | ||
| "paused": "yellow", | ||
|
|
@@ -2876,6 +2971,22 @@ def workflow_status( | |
| console.print(f" [{sc}]●[/{sc}] {step_id}: {s}") | ||
| else: | ||
| runs = engine.list_runs() | ||
|
|
||
| if json_output: | ||
| payload = { | ||
| "runs": [ | ||
| { | ||
| "run_id": r["run_id"], | ||
| "workflow_id": r.get("workflow_id"), | ||
| "status": r.get("status", "unknown"), | ||
| "updated_at": r.get("updated_at"), | ||
| } | ||
| for r in runs | ||
| ] | ||
| } | ||
| _emit_workflow_json(payload) | ||
| return | ||
|
|
||
| if not runs: | ||
| console.print("[yellow]No workflow runs found.[/yellow]") | ||
| return | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.