Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/reference/workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ specify workflow run <source>
| Option | Description |
| ------------------- | -------------------------------------------------------- |
| `-i` / `--input` | Pass input values as `key=value` (repeatable) |
| `--json` | Emit the run outcome as a single JSON object |

Runs a workflow from a catalog ID, URL, or local file path. Inputs declared by the workflow can be provided via `--input` or will be prompted interactively.

Expand All @@ -20,6 +21,24 @@ Example:
specify workflow run speckit -i spec="Build a kanban board with drag-and-drop task management" -i scope=full
```

With `--json`, a single machine-readable object is printed instead of formatted text (the default output is unchanged when the flag is omitted):

```bash
specify workflow run my-pipeline.yml --json
```

```json
{
"run_id": "662bf791",
"workflow_id": "build-and-review",
"status": "paused",
"current_step_id": "review",
"current_step_index": 0
}
```

`workflow_id` is the `workflow.id` declared inside the YAML, not the file name. The object is printed exactly as shown — pretty-printed with two-space indentation, on plain stdout with no Rich markup — so it always parses.

> **Note:** All workflow commands require a project already initialized with `specify init`.

## Resume a Workflow
Expand All @@ -31,6 +50,7 @@ specify workflow resume <run_id>
| Option | Description |
| ------------------- | -------------------------------------------------------- |
| `-i` / `--input` | Updated input values as `key=value` (repeatable) |
| `--json` | Emit the resume outcome as a single JSON object |

Resumes a paused or failed workflow run from the exact step where it stopped. Useful after responding to a gate step or fixing an issue that caused a failure.

Expand All @@ -46,6 +66,10 @@ specify workflow resume <run_id> --input cmd="exit 0"
specify workflow status [<run_id>]
```

| Option | Description |
| ------------------- | -------------------------------------------------------- |
| `--json` | Emit run status (or the runs list) as a JSON object |

Shows the status of a specific run, or lists all runs if no ID is given. Run states: `created`, `running`, `completed`, `paused`, `failed`, `aborted`.

## List Installed Workflows
Expand Down
86 changes: 82 additions & 4 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2733,19 +2733,46 @@ def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]:
return inputs


def _workflow_run_payload(state: Any) -> dict[str, Any]:
"""Machine-readable summary of a run/resume outcome."""
return {
"run_id": state.run_id,
"workflow_id": state.workflow_id,
"status": state.status.value,
"current_step_id": state.current_step_id,
"current_step_index": state.current_step_index,
}


def _emit_workflow_json(payload: dict[str, Any]) -> None:
"""Write a workflow payload as machine-readable JSON to stdout.

Uses the builtin ``print`` rather than ``console.print`` so Rich
markup interpretation, syntax highlighting, and line-wrapping can
never alter the emitted JSON.
"""
print(json.dumps(payload, indent=2))


@workflow_app.command("run")
def workflow_run(
source: str = typer.Argument(..., help="Workflow ID or YAML file path"),
input_values: list[str] | None = typer.Option(
None, "--input", "-i", help="Input values as key=value pairs"
),
json_output: bool = typer.Option(
False,
"--json",
help="Emit the run outcome as a single JSON object instead of formatted text.",
),
):
"""Run a workflow from an installed ID or local YAML path."""
from .workflows.engine import WorkflowEngine

project_root = _require_specify_project()
engine = WorkflowEngine(project_root)
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
if not json_output:
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in abfa237. While --json is active, engine.execute()/engine.resume() now run inside a file-descriptor-level redirect (dup2 of fd 1 → fd 2), so both Python-level writes (the gate prompt) and inherited-fd subprocess output (the prompt step's CLI) land on stderr while stdout carries only the JSON object. Step progress stays visible on stderr; status doesn't run the engine, so it's unaffected. Added tests that exercise both pollution channels (a Python print and a real subprocess) via fd-level capture, plus the inactive no-op path, and documented the stdout/stderr split.


try:
definition = engine.load_workflow(source)
Expand All @@ -2767,8 +2794,9 @@ def workflow_run(
# Parse inputs
inputs = _parse_input_values(input_values)

console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})")
console.print(f"[dim]Version: {definition.version}[/dim]\n")
if not json_output:
console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})")
console.print(f"[dim]Version: {definition.version}[/dim]\n")

try:
state = engine.execute(definition, inputs)
Expand All @@ -2779,6 +2807,10 @@ def workflow_run(
console.print(f"[red]Workflow failed:[/red] {exc}")
raise typer.Exit(1)

if json_output:
_emit_workflow_json(_workflow_run_payload(state))
return

status_colors = {
"completed": "green",
"paused": "yellow",
Expand All @@ -2799,13 +2831,19 @@ def workflow_resume(
input_values: list[str] | None = typer.Option(
None, "--input", "-i", help="Updated input values as key=value pairs"
),
json_output: bool = typer.Option(
False,
"--json",
help="Emit the resume outcome as a single JSON object instead of formatted text.",
),
):
"""Resume a paused or failed workflow run."""
from .workflows.engine import WorkflowEngine

project_root = _require_specify_project()
engine = WorkflowEngine(project_root)
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")
if not json_output:
engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026")

inputs = _parse_input_values(input_values)

Expand All @@ -2821,6 +2859,10 @@ def workflow_resume(
console.print(f"[red]Resume failed:[/red] {exc}")
raise typer.Exit(1)

if json_output:
_emit_workflow_json(_workflow_run_payload(state))
return

status_colors = {
"completed": "green",
"paused": "yellow",
Expand All @@ -2834,6 +2876,11 @@ def workflow_resume(
@workflow_app.command("status")
def workflow_status(
run_id: str | None = typer.Argument(None, help="Run ID to inspect (shows all if omitted)"),
json_output: bool = typer.Option(
False,
"--json",
help="Emit run status as a single JSON object instead of formatted text.",
),
):
"""Show workflow run status."""
from .workflows.engine import WorkflowEngine
Expand All @@ -2849,6 +2896,21 @@ def workflow_status(
console.print(f"[red]Error:[/red] Run not found: {run_id}")
raise typer.Exit(1)

if json_output:
# Build on the shared run/resume payload so the common fields
# (including current_step_index) stay identical across commands.
payload = {
**_workflow_run_payload(state),
"created_at": state.created_at,
"updated_at": state.updated_at,
"steps": {
sid: sd.get("status", "unknown")
for sid, sd in state.step_results.items()
},
}
_emit_workflow_json(payload)
return

status_colors = {
"completed": "green",
"paused": "yellow",
Expand Down Expand Up @@ -2876,6 +2938,22 @@ def workflow_status(
console.print(f" [{sc}]●[/{sc}] {step_id}: {s}")
else:
runs = engine.list_runs()

if json_output:
payload = {
"runs": [
{
"run_id": r["run_id"],
"workflow_id": r.get("workflow_id"),
"status": r.get("status", "unknown"),
"updated_at": r.get("updated_at"),
}
for r in runs
]
}
_emit_workflow_json(payload)
return

if not runs:
console.print("[yellow]No workflow runs found.[/yellow]")
return
Expand Down
118 changes: 118 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3028,6 +3028,124 @@ def test_switch_workflow(self, project_dir):
assert "do-specify" not in state.step_results


class TestWorkflowJsonOutput:
"""Test the --json machine-readable output for run/resume/status."""

_WF = """
schema_version: "1.0"
workflow:
id: "json-wf"
name: "JSON WF"
version: "1.0.0"
steps:
- id: ask
type: gate
message: "Review"
options: [approve, reject]
- id: after
type: shell
run: "echo done"
"""

_WF_DONE = """
schema_version: "1.0"
workflow:
id: "json-done"
name: "JSON Done"
version: "1.0.0"
steps:
- id: only
type: shell
run: "echo done"
"""

def _write_wf(self, project_dir, text, name):
path = project_dir / f"{name}.yml"
path.write_text(text, encoding="utf-8")
return path

def _invoke(self, project_dir, args):
from typer.testing import CliRunner
from unittest.mock import patch
from specify_cli import app

runner = CliRunner()
with patch.object(Path, "cwd", return_value=project_dir):
return runner.invoke(app, args, catch_exceptions=False)

def test_run_json_completed(self, project_dir):
wf = self._write_wf(project_dir, self._WF_DONE, "done")
result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"])
assert result.exit_code == 0
payload = json.loads(result.stdout)
assert payload["workflow_id"] == "json-done"
assert payload["status"] == "completed"
assert "run_id" in payload

def test_run_json_paused(self, project_dir):
wf = self._write_wf(project_dir, self._WF, "gated")
result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"])
assert result.exit_code == 0
payload = json.loads(result.stdout)
assert payload["status"] == "paused"
assert payload["current_step_id"] == "ask"
assert payload["current_step_index"] == 0

def test_run_json_output_has_no_markup_or_ansi(self, project_dir):
wf = self._write_wf(project_dir, self._WF_DONE, "clean")
out = self._invoke(
project_dir, ["workflow", "run", str(wf), "--json"]
).stdout
# Machine output must be exactly the JSON object: no Rich markup
# tags and no ANSI escape sequences leaking in.
assert "\x1b[" not in out
assert "[/" not in out
assert out.strip() == json.dumps(json.loads(out), indent=2)

def test_run_default_output_is_human_not_json(self, project_dir):
wf = self._write_wf(project_dir, self._WF_DONE, "done2")
result = self._invoke(project_dir, ["workflow", "run", str(wf)])
assert result.exit_code == 0
assert "Running workflow" in result.stdout
with pytest.raises(json.JSONDecodeError):
json.loads(result.stdout)

def test_status_json_single_and_list(self, project_dir):
wf = self._write_wf(project_dir, self._WF, "gated2")
run = json.loads(
self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout
)
rid = run["run_id"]

single = json.loads(
self._invoke(project_dir, ["workflow", "status", rid, "--json"]).stdout
)
assert single["run_id"] == rid
assert single["status"] == "paused"
assert single["steps"]["ask"] == "paused"
# status --json carries the same step-position fields as run/resume
# so automation never has to branch on which command produced it.
assert single["current_step_id"] == run["current_step_id"]
assert single["current_step_index"] == run["current_step_index"]

listing = json.loads(
self._invoke(project_dir, ["workflow", "status", "--json"]).stdout
)
assert any(r["run_id"] == rid for r in listing["runs"])

def test_resume_json(self, project_dir):
wf = self._write_wf(project_dir, self._WF, "gated3")
rid = json.loads(
self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout
)["run_id"]
# Non-interactive resume re-runs the gate, which pauses again.
resumed = json.loads(
self._invoke(project_dir, ["workflow", "resume", rid, "--json"]).stdout
)
assert resumed["run_id"] == rid
assert resumed["status"] == "paused"


class TestResumeWithInputs:
"""Test that `workflow resume` can accept updated workflow inputs."""

Expand Down