From 295addf164cb28a9e0a3051be58e68d6f5aa0824 Mon Sep 17 00:00:00 2001 From: somo9909 Date: Mon, 15 Jun 2026 22:13:35 +0530 Subject: [PATCH 1/2] fix(cli): propagate --repo/--path to search/chat subparsers, fix --base-url help text (#1078) --- hub/agents/python/code/gaia_agent_code/cli.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/hub/agents/python/code/gaia_agent_code/cli.py b/hub/agents/python/code/gaia_agent_code/cli.py index 5da1c0557..280ba5c01 100644 --- a/hub/agents/python/code/gaia_agent_code/cli.py +++ b/hub/agents/python/code/gaia_agent_code/cli.py @@ -284,7 +284,7 @@ def _build_index_parser(): parser.add_argument( "--base-url", default=None, - help="Lemonade server URL (default: http://localhost:8000/api/v1)", + help="Lemonade server URL (default: http://localhost:13305/api/v1)", ) parser.add_argument( "--no-lemonade-check", @@ -309,9 +309,26 @@ def _build_index_parser(): help="Number of results to return (default: 10)", ) + search_p.add_argument( + "--repo", + default=None, + help="Path to repository root (overrides parent --repo)", + ) + sub.add_parser("status", help="Show index status") sub.add_parser("clear", help="Clear the index") - sub.add_parser("chat", help="Interactive code Q&A (CodeAgent + code_index tools)") + + chat_p = sub.add_parser("chat", help="Interactive code Q&A (CodeAgent + code_index tools)") + chat_p.add_argument( + "--repo", + default=None, + help="Path to repository root (overrides parent --repo)", + ) + chat_p.add_argument( + "--path", + default=None, + help="Alias for --repo (path to repository root)", + ) return parser From abd0579cd6ffe70922ef4d67be5e485f7f8a2837 Mon Sep 17 00:00:00 2001 From: somo9909 Date: Tue, 16 Jun 2026 00:48:03 +0530 Subject: [PATCH 2/2] fix(cli): use argparse.SUPPRESS for subparser --repo/--path flags - Prevents subparser default=None clobbering parent default='.' - Makes --path a true alias for --repo via dest='repo' Fixes #1078 --- .github/workflows/test_agent_behavior_e2e.yml | 46 +- context7.json | 2 + docs/docs.json | 4 +- docs/guides/email.mdx | 14 +- docs/plans/email-agent-packaging.mdx | 288 ++++++++++++ docs/releases/v0.21.1.mdx | 38 ++ hub/agents/python/code/gaia_agent_code/cli.py | 15 +- .../python/email/gaia_agent_email/agent.py | 27 +- .../tools/preference_tools.py | 131 +++++- .../gaia_agent_email/tools/profile_tools.py | 231 ++++++++++ .../email/tests/test_email_inbox_profiling.py | 431 ++++++++++++++++++ .../tests/test_email_preferences_persist.py | 415 +++++++++++++++++ src/gaia/apps/webui/package-lock.json | 4 +- src/gaia/apps/webui/package.json | 2 +- src/gaia/apps/webui/src/App.tsx | 20 + .../apps/webui/src/components/ChatView.tsx | 136 ++++-- .../apps/webui/src/components/Sidebar.css | 10 + .../apps/webui/src/components/Sidebar.tsx | 15 +- src/gaia/apps/webui/src/services/api.ts | 207 +++++---- .../src/stores/__tests__/chatStore.test.ts | 38 ++ src/gaia/apps/webui/src/stores/chatStore.ts | 17 + src/gaia/ui/_chat_helpers.py | 93 +++- src/gaia/ui/routers/chat.py | 55 ++- src/gaia/ui/routers/sessions.py | 6 + src/gaia/ui/run_manager.py | 185 ++++++++ src/gaia/ui/server.py | 1 + src/gaia/version.py | 2 +- tests/unit/chat/ui/test_chat_active_attach.py | 110 +++++ tests/unit/chat/ui/test_run_manager.py | 179 ++++++++ 29 files changed, 2546 insertions(+), 176 deletions(-) create mode 100644 docs/plans/email-agent-packaging.mdx create mode 100644 docs/releases/v0.21.1.mdx create mode 100644 hub/agents/python/email/gaia_agent_email/tools/profile_tools.py create mode 100644 hub/agents/python/email/tests/test_email_inbox_profiling.py create mode 100644 hub/agents/python/email/tests/test_email_preferences_persist.py create mode 100644 src/gaia/ui/run_manager.py create mode 100644 tests/unit/chat/ui/test_chat_active_attach.py create mode 100644 tests/unit/chat/ui/test_run_manager.py diff --git a/.github/workflows/test_agent_behavior_e2e.yml b/.github/workflows/test_agent_behavior_e2e.yml index c5679089d..2a702312f 100644 --- a/.github/workflows/test_agent_behavior_e2e.yml +++ b/.github/workflows/test_agent_behavior_e2e.yml @@ -27,23 +27,43 @@ jobs: - name: Setup Python environment uses: ./.github/actions/setup-venv with: - install-package: ".[dev]" + # [ui] carries fastapi/uvicorn + the RAG deps gaia.ui.server imports at + # boot — the harness spawns that server, so [dev] alone fails at import. + install-package: ".[dev,ui]" - - name: Ensure Lemonade - shell: powershell - run: | - gaia init --skip-lemonade - if ($LASTEXITCODE -ne 0) { throw "gaia init failed" } - - - name: Run behavior E2E harness + - name: Start Lemonade + run behavior E2E harness shell: powershell env: - LEMONADE_BASE_URL: http://localhost:13305/api/v1 + LEMONADE_BASE_URL: http://127.0.0.1:13305/api/v1 + # Loopback server — bypass any runner proxy so Python requests reaches it. + NO_PROXY: "localhost,127.0.0.1" run: | - python -m pytest tests/integration/eval/test_behavior_e2e.py ` - -m real_model -v --tb=short ` - --basetemp="${{ runner.temp }}\pytest-behavior" - if ($LASTEXITCODE -ne 0) { throw "behavior E2E harness failed" } + # Start Lemonade AND run the harness in a SINGLE PowerShell session. + # The Lemonade server (a detached Start-Process child) does not survive + # a GitHub Actions step boundary, so starting it in a separate step made + # the harness skip with "Lemonade server not reachable" even though the + # model loaded. start-lemonade.ps1 itself notes it must run in one + # session "to avoid process lifecycle issues"; test_examples.yml does + # the same. The previous `gaia init --skip-lemonade` step also blocked + # on an interactive prompt when the server was down (#1639). + try { + .\installer\scripts\start-lemonade.ps1 ` + -ModelName "Qwen3.5-35B-A3B-GGUF" ` + -Port 13305 ` + -CtxSize 32768 ` + -InitWaitTime 15 + + python -m pytest tests/integration/eval/test_behavior_e2e.py ` + -m real_model -v -rs --tb=short ` + --basetemp="${{ runner.temp }}\pytest-behavior" + if ($LASTEXITCODE -ne 0) { throw "behavior E2E harness failed" } + } + finally { + if ($env:LEMONADE_PROCESS_ID) { + Write-Host "Stopping Lemonade server (pid $env:LEMONADE_PROCESS_ID)..." + Stop-Process -Id $env:LEMONADE_PROCESS_ID -Force -ErrorAction SilentlyContinue + } + } - name: Upload artifacts on failure if: failure() diff --git a/context7.json b/context7.json index d26b06715..347d34346 100644 --- a/context7.json +++ b/context7.json @@ -1,5 +1,7 @@ { "$schema": "https://context7.com/schema/context7.json", + "url": "https://context7.com/amd/gaia", + "public_key": "pk_XqnEKNKLfMIJQneXnq5wZ", "projectTitle": "GAIA", "description": "AMD's open-source framework for building AI agents in Python and C++ that run entirely on local hardware, with AMD NPU and GPU acceleration on Ryzen AI processors.", "folders": ["docs"], diff --git a/docs/docs.json b/docs/docs.json index bdc15ac34..4ea952dff 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -408,6 +408,7 @@ "pages": [ "plans/agent-hub", "plans/agent-hub-ui", + "plans/email-agent-packaging", "spec/agent-hub-restructure", "spec/agent-skills", "plans/skill-format", @@ -440,6 +441,7 @@ "group": "Release Notes", "pages": [ "releases/index", + "releases/v0.21.1", "releases/v0.21.0", "releases/v0.20.1", "releases/v0.20.0", @@ -496,7 +498,7 @@ "navbar": { "links": [ { - "label": "v0.21.0 \u00b7 Lemonade 10.2.0", + "label": "v0.21.1 \u00b7 Lemonade 10.2.0", "href": "https://github.com/amd/gaia/releases" }, { diff --git a/docs/guides/email.mdx b/docs/guides/email.mdx index d1ad92666..b950a39ee 100644 --- a/docs/guides/email.mdx +++ b/docs/guides/email.mdx @@ -127,16 +127,16 @@ Each row carries inline action buttons: If you haven't connected Google yet, the agent surfaces a one-click **Connect Google** button inline in the chat — no need to navigate to Settings → Connections manually. -### In-session preferences (in-memory, wiped on restart) +### Cross-session preferences (persisted via memory store) -Tell the agent how you want classification to behave for this session: +Tell the agent how you want classification to behave: -- *"Treat boss@company.com as urgent"* → calls `set_priority_sender`. That sender bypasses the heuristic and lands in **Urgent** for the rest of the session. +- *"Treat boss@company.com as urgent"* → calls `set_priority_sender`. That sender bypasses the heuristic and lands in **Urgent** across all future sessions. - *"Treat newsletter@stripe.com as low priority"* → calls `set_low_priority_sender`. That sender lands in **Suggested archives**. - *"Default informational mail to archive"* → calls `set_category_default("informational", "archive")`. Informational items lift into **Suggested archives** until you reset. - *"Clear my preferences"* → calls `clear_session_preferences`. -Preferences are stored in process memory only — restarting the agent (or quitting Agent UI) wipes them. This is deliberate: the goal is to prove the value of session-scoped learning before we wire up persistent memory. Once persistent memory ships, the same tools will write through to it without changing this surface. +Preferences persist across agent restarts via the memory store (`email:preferences`). They fall back to session-only (lost on restart) when `GAIA_MEMORY_DISABLED=1` or when the embedding backend is unreachable at startup. Incognito sessions never write preferences to persistent storage. ## Action surface @@ -144,10 +144,14 @@ Preferences are stored in process memory only — restarting the agent (or quitt `list_inbox`, `get_message`, `get_thread`, `search_messages`, `list_labels`, `triage_inbox`, `pre_scan_inbox` -### Session preferences (in-memory; wiped on agent restart) +### Preferences (persisted across restarts via memory store) `set_priority_sender`, `set_low_priority_sender`, `set_category_default`, `clear_session_preferences` +### Inbox profiling + +`profile_inbox` — asks "who emails me most?" and returns a frequency ranking of senders with their dominant category (e.g. *urgent*, *informational*) and the timestamp of their most recent message. Profiling is built from the interaction history the agent accumulates during triage, so it improves the more you use the agent. + ### Organize (reversible via the undo log) `archive_message`, `mark_read`, `mark_unread`, `add_star`, `remove_star`, `label_message`, `move_to_label` diff --git a/docs/plans/email-agent-packaging.mdx b/docs/plans/email-agent-packaging.mdx new file mode 100644 index 000000000..505360be9 --- /dev/null +++ b/docs/plans/email-agent-packaging.mdx @@ -0,0 +1,288 @@ +--- +title: "Email Agent Packaging" +description: "Distribute the email triage agent as an npm-installable local sidecar — a thin JS/TS client plus per-platform Python and C++ binaries — for embedding in third-party web apps" +icon: "package" +--- + +# Email Agent Packaging + + +**Milestone:** Email Agent & Platform Foundations | **Status:** Planning | **Priority:** High + + + +**Work in Progress** — this plan is the authoritative scope for the packaging milestone. Distribution mechanics (npm layout, signing ownership) are settled below; open decisions are called out explicitly. + + +--- + +## Overview + +The email triage agent already builds as a Python wheel + sdist (`gaia-agent-email`) and is shareable as a GitHub prerelease asset. That path serves Python integrators and partner evaluation, but it does **not** serve the primary consumer: a **JavaScript/Node host web app that wants to embed the agent as a local service** with a one-command install and no Python/C++ toolchain on the user's machine. + +This milestone delivers that path: the email agent distributed through **npm** as a thin **JS/TS client** plus **per-platform, self-contained binaries** (a frozen Python build and a native C++ build), runnable as a **local sidecar** the host app spawns and talks to over a loopback REST API. + +### Goals + +- A host web app can run `npm install @amd-gaia/agent-email` and, with one **build-time fetch** step, get a working local email agent — **no Python install, no C++ toolchain, no compile step**. +- The npm package stays **lightweight** (JS/TS client + fetch CLI only); the large binaries live in **R2** object storage, the single source of truth. +- **One REST contract** backs both the Python and C++ implementations; the host integrates once and is implementation-agnostic. +- The platform binary is **present on disk at build/sign time** (fetched before the host signs its app), so it's covered by the host's code signature and runs **fully offline at runtime**. +- Each platform fetches **only its own binary**, integrity-verified against a **pinned SHA-256** — fail loudly on mismatch. +- Binaries are **signed/notarized** so a host app that bundles them stays signable. + +### Non-goals (this milestone) + +- Re-enabling the paused PyPI publish path (tracked separately under #1179). +- An in-process Node native addon (N-API). Captured as a future fast-path in [Deferred](#deferred), not built here. +- Third-party host glue. We ship the agent + client + contract; an external host owns its own integration. **Exception:** the GAIA Agent UI is our *reference consumer* and its migration to the sidecar is in scope as the validation phase ([Phase 6](#phase-6-agent-ui-as-first-consumer-validation)). +- New triage capabilities. This is a **packaging and distribution** milestone, not a feature milestone. + +--- + +## Current State + +| Asset | State | +|-------|-------| +| `gaia-agent-email` wheel + sdist | ✅ Builds via `build-agent-wheel` composite action | +| GitHub-prerelease share | ✅ `build_agent_package.yml` (`workflow_dispatch`), `agent-pkg-email-v0.1.0` published — kept as the **manual** mirror | +| R2 distribution backend (`workers/agent-hub/`) | ✅ Cloudflare Worker with `POST /publish` (Bearer auth, version-immutability, server-side SHA-256, `agents///…` layout, index rebuild) — **reuse as the canonical store** | +| CI workflow that pushes binaries to R2 | ❌ Does not exist — `build_agents.yml` uploads only GitHub Actions artifacts ("R2 … handled by other issues"); `build_agent_package.yml` is **manual** → GitHub release — **this milestone** | +| PyPI publish | ⏸ Paused (#1179) — out of scope here | +| REST API surface | ✅ Email REST router exists (triage, connectors); needs a published contract | +| Agent UI email integration | ⚠️ Today the UI backend imports the `gaia-agent-email` **Python wheel** (`amd-gaia[api]` auto-mounts its router); the React UI talks to it via the backend. Phase 6 migrates this to the npm sidecar. | +| npm client | ❌ Does not exist — **this milestone** | +| Frozen Python binary (no-interpreter) | ❌ Does not exist — **this milestone** | +| C++ implementation + binary | ❌ `hub/agents/cpp/` is an empty placeholder — **this milestone** | +| Build-time fetch CLI + SHA-256 manifest | ❌ Does not exist — **this milestone** | +| Signed/notarized binaries | ❌ Does not exist — **this milestone** | +| Sidecar lifecycle contract | ❌ Not specified — **this milestone** | + +--- + +## Target Architecture + +### Sidecar model + +The agent runs as a **separate local process** the host app spawns, supervises, and talks to over loopback HTTP. This is the default because it is **language-agnostic** (the Python and C++ builds look identical to the host — spawn a binary, hit a port), **crash-isolated** (a segfault in the agent does not take down the host), and allows **independent release cadence**. + +``` +┌─────────────────────────┐ loopback REST ┌──────────────────────────┐ +│ Host web app (Node) │ ───────────────────────────▶ │ Email agent sidecar │ +│ │ POST /v1/email/triage │ (frozen Python OR C++) │ +│ @amd-gaia/agent-email │ GET /health /version │ REST server + connectors │ +│ (JS/TS client) │ ◀─────────────────────────── │ │ +│ • spawns binary │ └──────────────────────────┘ +│ • health/version check │ +│ • typed REST calls │ +└─────────────────────────┘ +``` + +### Distribution — thin npm package + build-time R2 fetch + +The npm package carries **only** the JS/TS client and a small fetch CLI — **no binaries**. The large per-platform binaries live in **R2** (the existing `workers/agent-hub/` backend), which becomes the single source of truth. The host pulls its platform binary with an **explicit build-time step**, before it signs its app. + +``` +@amd-gaia/agent-email ← npm: JS/TS client + `fetch` CLI + SHA-256 manifest. NO binaries. + │ build-time: npx @amd-gaia/agent-email fetch + ▼ +R2 via Agent Hub Worker ← canonical, immutable, versioned (existing POST /publish + bucket) + agents/email//email-win32-x64.exe + agents/email//email-darwin-arm64 + agents/email//email-darwin-x64 + agents/email//email-linux-x64 + │ verified against pinned SHA-256, written to the host's resources dir + ▼ +host app bundle (signed) ← binary on disk at sign time → covered by the host signature +``` + +The binaries are stored under the **existing Agent Hub R2 layout** (`agents///…`) and served by the same Worker that already fronts the bucket. The fetch CLI resolves `${process.platform}-${process.arch}`, downloads that one object, **verifies it against a SHA-256 pinned in the package** (fail loudly on mismatch), and writes it to a host-specified resources directory: + +```jsonc +// @amd-gaia/agent-email/package.json (the only published binary-bearing artifact is the manifest) +{ + "name": "@amd-gaia/agent-email", + "version": "0.1.0", + "bin": { "agent-email": "./cli.js" }, // exposes `npx @amd-gaia/agent-email fetch` + "files": ["dist/", "binaries.lock.json"] // SHA-256 + R2 keys per platform, no binaries +} +``` + +```ts +// resolves the *fetched* binary (written by the build-time fetch step), not an npm sub-package +function binaryPath(resourcesDir: string): string { + const ext = process.platform === "win32" ? ".exe" : ""; + return path.join(resourcesDir, `email-${process.platform}-${process.arch}${ext}`); +} +``` + +**Build-time, not runtime.** The fetch runs in the host's build pipeline *before* code-signing, so the binary is on disk when the host signs and notarizes — it is covered by the host's signature and needs no network at runtime. A **runtime / first-init download is explicitly rejected**: a binary fetched after the app is signed is not covered by the signature (macOS Gatekeeper quarantine blocks it; Windows SmartScreen flags it) and breaks offline first-run. + +**Why this over the alternatives:** + +- **Binaries in npm via `optionalDependencies`** (esbuild/swc pattern) — considered, and viable; it's the most reproducible (lockfile-pinned) and lowest-friction for the consumer (`npm install` just works). Rejected as the default here because frozen-Python binaries are large (~40–100 MB each), and we want **one canonical binary store (R2)** shared across the npm path, the `gaia agent install` Hub path, and the GitHub manual share — rather than duplicating tens of MB into the npm registry per release. +- **`postinstall` download** — breaks under `npm install --ignore-scripts` and behind proxies; rejected. The fetch is an **explicit build step**, not an install hook. +- **Runtime / first-init download** — rejected (signing + offline reasons above). + +A **runtime-fetch mode** is offered *only* for the non-bundled server/dev consumer (behind an explicit flag, same SHA-256 verification, fails loudly when offline) — never into a signed app bundle. + +### Lifecycle handshake + +The host app implements a deterministic startup sequence; the client SDK provides helpers for each step: + +1. **Resolve** the platform binary (`binaryPath()` above) — the binary fetched at build time from R2 and SHA-verified. +2. **Spawn** it on an allocated loopback port (`agent --port

--host 127.0.0.1`). +3. **Health** — poll `GET /health` until ready or timeout (fail loudly on timeout — no silent "assume ready"). +4. **Version-check** — `GET /version` returns `{ apiVersion, agentVersion }`; the client refuses a mismatched `apiVersion` with an actionable error rather than making calls against an incompatible binary. +5. **Ready** — typed REST calls; clean shutdown on host exit. + +### One contract, two implementations + +A single **OpenAPI spec** is the source of truth for the REST surface. The Python and C++ servers both implement it; the JS/TS client is generated/validated against it. Breaking changes bump a SemVer'd `apiVersion` advertised on `/version`. This is what makes the host integration implementation-agnostic. + +--- + +## Workstreams + +### Phase 1 — REST contract & client SDK + +**Goal:** lock the wire contract and ship the JS/TS client against the existing Python server. + +- Author the **OpenAPI spec** for the email agent REST surface (triage, connectors, health, version). Commit it to the repo as the source of truth. +- Add `GET /version` (`{ apiVersion, agentVersion }`) and confirm `GET /health` semantics on the existing Python server. +- Build `@amd-gaia/agent-email` (TypeScript): typed REST client + lifecycle helpers (spawn, health-poll, version-check, shutdown). Full type defs, no runtime binary yet. +- Unit + contract tests: client validated against the OpenAPI spec; lifecycle helpers tested against a stub server. + +**Exit criteria:** a Node script can `import` the client, point it at a manually-started Python server, and run a triage round-trip with types. + +### Phase 2 — Frozen Python binary + +**Goal:** the Python implementation runs with **no interpreter on the user's machine**. + +- Freeze `gaia-agent-email` (PyInstaller or Nuitka) into a single self-contained executable that boots the REST server. +- Per-platform builds: `win32-x64`, `darwin-arm64`, `darwin-x64`, `linux-x64`. +- Smoke test each frozen binary in a **clean environment with no Python**: spawn → health → version → triage. +- Compute and record each binary's **SHA-256** (feeds the Phase 3 pinned manifest); measure per-platform size for R2 storage/transfer. + +**Exit criteria:** all four frozen binaries pass the no-Python smoke test and the lifecycle handshake. + +### Phase 3 — Fully-automated R2 distribution + thin npm package + +**Goal:** a single version tag publishes everything — binaries to R2 and the thin npm package — with **zero manual steps**. `npm install @amd-gaia/agent-email` + one build-time fetch then yields a working, signature-ready agent. + +- **Fully-automated, tag-triggered publish workflow** (no `workflow_dispatch` gate): on a namespaced tag, the workflow builds every platform binary → computes SHA-256 → pushes each to R2 via the existing Worker `POST /publish` (Bearer `PUBLISH_TOKENS` from a GH Actions secret) → writes the pinned manifest → publishes the npm package. Re-running a published version is a **no-op** (the Worker's native version-immutability), not an overwrite. +- Reuse the existing pipeline rather than forking: extend `build_agents.yml`'s release-bundle leg (it already produces per-platform binaries + checksums) to call `POST /publish` instead of stopping at GitHub-artifact upload. +- Confirm/extend `POST /publish` to accept **multiple per-platform binaries under one `/`** (today it stores `gaia-agent.yaml` + an artifact; the four OS/arch binaries must coexist in one version without tripping per-version immutability). +- Build the thin `@amd-gaia/agent-email` npm package: JS/TS client + `fetch` CLI + a **`binaries.lock.json`** manifest pinning each platform's R2 key and **SHA-256**. +- `fetch` CLI: resolve platform → download from R2 → **verify SHA-256 (fail loudly on mismatch)** → write to the host-specified resources dir → `chmod +x` on POSIX. +- Document the host integration: run `npx @amd-gaia/agent-email fetch` as a **build step before signing**, and (for Electron-style hosts) keep the fetched binary outside the `asar` (`extraResources`) so it can be spawned. +- **Atomic release:** R2 upload → manifest write → npm publish on one tag; manifest SHAs must match the uploaded R2 objects, and the npm version must reference an R2 version that exists, or the whole release fails. + +**Exit criteria:** pushing a version tag publishes binaries to R2 and the npm package end-to-end with no human step; on three real OSes, `npm install` + `npx @amd-gaia/agent-email fetch` retrieves the platform binary, SHA-256 verifies, and the lifecycle handshake succeeds; install works under `--ignore-scripts`. + +### Phase 4 — Code signing & notarization + +**Goal:** bundled binaries don't break a host app's own signing. + +- macOS: sign + **notarize** the `darwin` binaries (unsigned/un-notarized binaries break the host app's notarization). +- Windows: Authenticode-sign the `win32` binary (unsigned trips SmartScreen on the host's signed app). +- **Open decision:** *who holds the signing cert* — we sign and publish signed binaries, or we ship reproducible builds the partner signs under their cert. Resolve before Phase 3 publish. + +**Exit criteria:** signed binaries verify on each OS; a test host app bundling them passes notarization/SmartScreen. + +### Phase 5 — C++ implementation & binary + +**Goal:** a second implementation of the **same contract**, distributed identically. + +- Stand up the C++ email agent server under `hub/agents/cpp/email/`, implementing the Phase 1 OpenAPI contract (same routes, same `/version` `apiVersion`). +- Build static / dependency-bundled binaries for the four platforms via the existing C++ build infrastructure (`build_cpp.yml`, `cpp/packaging/`). +- Distribute via the **same R2 + thin-package mechanism** — upload to the same R2 key layout, pin SHA-256 in the manifest; the host install + fetch path is unchanged, only the binary's provenance differs. +- Conformance test: the C++ binary passes the **same contract test suite** as the Python binary (identical request/response behavior). + +**Exit criteria:** the C++ binary is a drop-in for the frozen Python binary behind the same client, validated by a shared contract test suite. + +### Phase 6 — Agent UI as first consumer (validation) + +**Goal:** dogfood the package. The GAIA Agent UI (`@amd-gaia/agent-ui`, Electron) consumes the email agent as the **npm-fetched binary sidecar** instead of importing the `gaia-agent-email` **Python wheel** into its backend. This is the milestone's real acceptance test — if our own app can install, fetch, spawn, and triage through the package, an external host can too. + +- Add `@amd-gaia/agent-email` as a dependency of the Electron app; run the build-time `fetch` in the app's build; spawn the sidecar via the client's lifecycle handshake (spawn → health → version → ready). +- Re-point the UI's email features (`ChatView`, `EmailPreScanCard`, `EmailConnectCta`, `useConnectorsSSE` triage calls) at the sidecar instead of the backend's mounted `/v1/email` router. +- **Connectors via shared OS keyring (no token handoff):** the backend keeps owning the OAuth connect flow and keyring writes (connect CTAs unchanged); the sidecar reads the **same OS keyring** through its bundled `gaia.connectors`. This relies on the connectors module's already-documented "two processes share the keyring" behavior — note the process-local token cache + concurrent-refresh caveat, but no new auth protocol. +- **Remove the built-in path:** stop pulling the `gaia-agent-email` wheel into the UI backend's dependency/auto-mount chain, so the sidecar is the only email surface — proving the package fully replaces the in-process agent. +- Keep the Python backend for everything non-email; only email migrates to the sidecar in this phase. + +**Exit criteria:** with the `gaia-agent-email` wheel removed from the UI backend, the Agent UI completes a triage + pre-scan flow through the spawned `@amd-gaia/agent-email` sidecar, and an existing Gmail/Outlook connection (granted via the unchanged connect flow) is picked up by the sidecar from the shared keyring. + +--- + +## CI / Release Pipeline + +**Deployment and distribution are fully automated — a version tag is the only human action.** No `workflow_dispatch`, no manual upload, no manual npm publish. + +- **One tag-triggered workflow** does the whole release: build matrix → `chmod +x` + sign + SHA-256 → push each binary to R2 via the Worker `POST /publish` → write the pinned manifest → `npm publish`. The current manual `build_agent_package.yml` (`workflow_dispatch` → GitHub release) is demoted to a fallback mirror, not the release path. +- **Reuse the existing legs:** extend `build_agents.yml` (C++ binaries + checksums, already tag-aware) and the `build-agent-wheel` action to push to R2 via `POST /publish` rather than stopping at GitHub-artifact upload — the workflow comment "R2 … handled by other issues" is exactly this work. +- **Idempotent by construction:** the Worker rejects republishing an existing `@` (native version-immutability), so a re-run or an unchanged-version tag is a safe no-op — no custom overwrite logic. +- **Atomic release:** R2 upload → manifest write → npm publish on a single tag. Manifest SHA-256s must match the uploaded R2 objects, and the npm version must reference an R2 version that exists, or the whole release fails — no client-without-binary or stale-SHA states. +- **Auth:** the publish step authenticates to the Worker with a Bearer `PUBLISH_TOKENS` secret stored in GitHub Actions (scoped to the `email` author); never embedded in the package. +- **Canonical store = R2, mirror = GitHub:** R2 (`workers/agent-hub/`) is the programmatic source of truth; the `agent-pkg-email-*` GitHub release stays as the manual, no-login mirror. +- **Namespacing:** keep release tags namespaced (`agent-pkg-email-*`) so they never fire the paused PyPI workflows (`publish.yml` / `publish_agents.yml`). +- **Distribution registries:** the existing **`@amd-gaia`** npm scope (already hosts `@amd-gaia/agent-ui`). The thin model needs **one public package, `@amd-gaia/agent-email`** — no per-platform sub-packages. +- **npm auth = trusted publishing (OIDC), no stored token.** The package is configured as an npm trusted publisher; the publish job requests `id-token: write` and runs `npm publish --access public` with provenance — **no `NPM_TOKEN` secret to store or rotate**. This mirrors the repo's PyPI trusted-publishing pattern (`publish_agents.yml`). The publish must run from the exact workflow file registered as the trusted publisher on npm. + +--- + +## Versioning & Contract Governance + +- **`apiVersion`** (SemVer) is the host-facing contract version, advertised on `/version`. Breaking REST changes bump its major. +- **Package version** tracks the agent build; the npm package, the R2 binary version (`agents/email//…`), and the pinned `binaries.lock.json` manifest always publish in lockstep — the manifest references the matching R2 version, never a floating one. +- The **OpenAPI spec is the single source of truth** — client, Python server, and C++ server are all validated against it in CI. A change to the contract that isn't reflected in all three fails CI. +- Client refuses a binary whose `apiVersion` major it doesn't support — **fail loudly**, never best-effort against an incompatible server. + +--- + +## Testing Strategy + +| Layer | Test | +|-------|------| +| Contract | Client + both servers validated against the OpenAPI spec in CI | +| Lifecycle | spawn → health → version → triage → shutdown, against each binary | +| Clean-env | Frozen Python binary on a machine with **no Python**; C++ binary with **no toolchain** | +| Per-platform | Full install + handshake on `win32-x64`, `darwin-arm64`, `darwin-x64`, `linux-x64` | +| Conformance | Python and C++ binaries pass the **same** contract suite (drop-in equivalence) | +| Signing | Notarization (macOS) / SmartScreen (Windows) on a test host app that runs the build-time fetch then bundles + signs the binary | +| Integrity | `fetch` rejects a tampered/size-mismatched R2 object (SHA-256 fail-loud); succeeds on a match | +| Offline runtime | After a build-time fetch, the agent spawns and serves with **no network** | +| Install shape | `npm install` pulls only the thin package; `--ignore-scripts` still works (fetch is an explicit step, not a hook) | + +--- + +## Risks + +- **Runtime download mistaken for build-time fetch** — if a host wires the fetch at runtime/first-init instead of at build time, the binary lands outside its code signature (Gatekeeper/SmartScreen blocks) and breaks offline first-run. *Mitigation:* make `fetch` an explicit, documented build step; the integration guide leads with "fetch before sign"; the runtime-fetch flag is opt-in and documented as server/dev-only. +- **R2 availability / integrity at build time** — a build can't fetch if R2 is down or an object is corrupt. *Mitigation:* immutable versioned keys, SHA-256 pinned in the package (fail loudly on mismatch), and the GitHub release as a manual fallback mirror. +- **Manifest ↔ R2 drift** — a published SHA that doesn't match the uploaded object bricks every consumer. *Mitigation:* the atomic release gates publish on manifest SHAs matching R2 objects. +- **Signing ownership unresolved** — blocks the first signed release. *Mitigation:* resolve the cert-ownership decision (us vs. partner) before Phase 4. +- **Contract drift between Python and C++** — two implementations of one spec can diverge. *Mitigation:* shared OpenAPI source of truth + a single conformance suite both must pass in CI. +- **Electron asar packaging** — binaries inside an asar can't be spawned. *Mitigation:* document the `asarUnpack` requirement prominently in the integration guide. +- **Executable-bit loss** — a binary published without the exec bit fails to spawn on POSIX. *Mitigation:* CI `chmod +x` + a post-pack assertion. + +--- + +## Deferred + +- **In-process Node native addon (N-API)** for the C++ build — lowest latency, in-memory data sharing, but couples to the host's exact Node/Electron ABI and makes a crash fatal to the host. Revisit as a v2 fast-path once the sidecar path is proven. +- **PyPI publish** — remains paused under #1179; the npm path does not depend on it. +- **Additional agents** — the npm mechanism is designed to generalize (swap the agent id), but onboarding other agents is out of scope for this milestone. + +--- + +## Exit Criteria (Milestone) + +1. `npm install @amd-gaia/agent-email` + `npx @amd-gaia/agent-email fetch` on each of the four target platforms yields a working agent with no Python/C++ prerequisites, pulling only that platform's binary from R2 and SHA-256-verifying it. +2. The JS/TS client runs the full lifecycle handshake and a triage round-trip against the frozen Python binary. +3. Binaries are signed/notarized and verified inside a test host app that fetched at build time; the agent then runs **offline**. +4. The C++ binary passes the same contract suite as the Python binary and is a drop-in behind the same client. +5. The thin npm package installs under `--ignore-scripts`; `fetch` fails loudly on a SHA-256 mismatch or when offline. +6. The OpenAPI contract is published and enforced in CI across client + both servers. +7. **Deployment is fully automated:** pushing a version tag publishes binaries to R2 and the npm package end-to-end with no manual step, and re-running the release is a safe no-op. +8. **Dogfooded:** the GAIA Agent UI runs email triage through the `@amd-gaia/agent-email` sidecar with the `gaia-agent-email` Python wheel removed from its backend, picking up connector tokens from the shared OS keyring. diff --git a/docs/releases/v0.21.1.mdx b/docs/releases/v0.21.1.mdx new file mode 100644 index 000000000..8948aa7da --- /dev/null +++ b/docs/releases/v0.21.1.mdx @@ -0,0 +1,38 @@ +--- +title: "v0.21.1" +description: "Patch release: unblocks NPU setup — `gaia init --profile npu` was failing for every NPU user — and gives the email agent a persistent memory store to build personalization on." +--- + +# GAIA v0.21.1 Release Notes + +GAIA v0.21.1 is a patch release on top of v0.21.0. The headline fix unblocks NPU setup: `gaia init --profile npu` was failing at the model download with an HTTP 400, so no one on a Ryzen AI NPU could get set up — it now downloads, verifies, and runs inference on the NPU. The release also wires a persistent memory store into the email agent, the foundation the upcoming personalization features build on. + +**Why upgrade:** +- **NPU setup works again** — `gaia init --profile npu` was dead on arrival, failing before any NPU user could finish setup. It now pulls the built-in model correctly and runs on the NPU. +- **The email agent remembers across sessions** — `gaia email` now boots with a persistent memory store instead of forgetting everything on restart, the groundwork for inbox personalization landing in upcoming releases. + +--- + +## What's New + +### Persistent memory for the email agent — `gaia email` + +The email agent used to start from scratch every session — anything it learned about your inbox was wiped on restart. It now boots with a persistent memory store and the tools to read and write it, so learned state survives across runs. This is the foundation for the inbox personalization (priority senders, profiling, behavioral learning) coming in upcoming releases — triage behavior itself is unchanged in this release (PR [#1632](https://github.com/amd/gaia/pull/1632)). + +--- + +## Bug Fixes + +- **`gaia init --profile npu` failed before any NPU user could finish setup** (PR [#1658](https://github.com/amd/gaia/pull/1658)) — the NPU profile pulled its built-in model with a `recipe` argument, which Lemonade rejects with an HTTP 400 unless you're registering a new user model. The built-in model is now pulled by name only, so the NPU profile downloads, verifies, and runs inference on the NPU. The fix also adds a testing rule so this class of fresh-install bug can't slip past a warm cache and a mocked client again. + +--- + +## Full Changelog + +**3 commits** since v0.21.0: + +- `cc6f2f39` — docs(plans): email agent packaging milestone (npm + sidecar binaries) (#1644) +- `801a5c1d` — feat(email): wire MemoryMixin into the email agent (#1114) (#1632) +- `a3e86b02` — fix(init): pull built-in NPU/FLM model by name, not recipe (#1655) (#1658) + +Full Changelog: [v0.21.0...v0.21.1](https://github.com/amd/gaia/compare/v0.21.0...v0.21.1) diff --git a/hub/agents/python/code/gaia_agent_code/cli.py b/hub/agents/python/code/gaia_agent_code/cli.py index 280ba5c01..a34638443 100644 --- a/hub/agents/python/code/gaia_agent_code/cli.py +++ b/hub/agents/python/code/gaia_agent_code/cli.py @@ -308,10 +308,10 @@ def _build_index_parser(): default=10, help="Number of results to return (default: 10)", ) - + # FIX 1: use SUPPRESS so the parent's default="." is preserved when --repo is omitted search_p.add_argument( "--repo", - default=None, + default=argparse.SUPPRESS, help="Path to repository root (overrides parent --repo)", ) @@ -319,14 +319,17 @@ def _build_index_parser(): sub.add_parser("clear", help="Clear the index") chat_p = sub.add_parser("chat", help="Interactive code Q&A (CodeAgent + code_index tools)") + # FIX 2: use SUPPRESS so the parent's default="." is preserved when --repo is omitted chat_p.add_argument( "--repo", - default=None, + default=argparse.SUPPRESS, help="Path to repository root (overrides parent --repo)", ) + # FIX 3: dest="repo" makes --path a true alias for --repo, no changes needed in cmd_index chat_p.add_argument( "--path", - default=None, + dest="repo", + default=argparse.SUPPRESS, help="Alias for --repo (path to repository root)", ) @@ -526,7 +529,7 @@ def main(): parser.add_argument( "--path", "-p", - type=str, + type=str, default=None, help="Project directory path. Creates directory if it doesn't exist.", ) @@ -623,4 +626,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) + sys.exit(main()) \ No newline at end of file diff --git a/hub/agents/python/email/gaia_agent_email/agent.py b/hub/agents/python/email/gaia_agent_email/agent.py index 2d6010f5c..5e80e98ae 100644 --- a/hub/agents/python/email/gaia_agent_email/agent.py +++ b/hub/agents/python/email/gaia_agent_email/agent.py @@ -53,6 +53,7 @@ class never passes ``use_claude=True`` / ``use_chatgpt=True`` to PreferenceToolsMixin, init_session_preferences, ) +from gaia_agent_email.tools.profile_tools import ProfileToolsMixin from gaia_agent_email.tools.read_tools import ReadToolsMixin from gaia_agent_email.tools.reply_tools import ReplyToolsMixin from gaia_agent_email.tools.summarize_tools import SummarizeToolsMixin @@ -113,9 +114,9 @@ class never passes ``use_claude=True`` / ``use_chatgpt=True`` to shows the user the literal recipient/subject/body; trust ONLY what appears there. - Preference tools (set_priority_sender, set_low_priority_sender, - set_category_default, clear_session_preferences) — mutate session-scoped - classification preferences. Confirm the change in plain English; the - preferences are wiped on agent restart by design. + set_category_default, clear_session_preferences) — mutate persistent + classification preferences that survive across restarts. Confirm the + change in plain English. PRE-SCAN BEHAVIOR: When the user asks for a pre-scan, morning brief, triage view, or "what's @@ -151,6 +152,7 @@ class EmailTriageAgent( CalendarToolsMixin, PreferenceToolsMixin, PhishingToolsMixin, + ProfileToolsMixin, ): """Email Triage Agent — Gmail + Calendar through the connectors framework, all body inference local on Lemonade. @@ -270,6 +272,11 @@ def __init__(self, config: Optional[EmailAgentConfig] = None): memory_db.parent.mkdir(parents=True, exist_ok=True) self.init_memory(db_path=memory_db, context="email") + # Restore preferences from the previous session. Must come after + # init_memory() (so _memory_store is set) and after + # _session_preferences is set (done above). + self._load_persisted_preferences() + # LLM connection. Default to Lemonade — the config's base_url # allowlist guarantees the host is local. effective_model_id = config.model_id or DEFAULT_MODEL_NAME @@ -321,6 +328,7 @@ def _register_tools(self) -> None: self._register_calendar_tools() self._register_preference_tools() self._register_phishing_tools() + self._register_profile_tools() self.register_memory_tools() # -- Phase 2 multi-inbox routing (#1603) ------------------------------- @@ -456,7 +464,10 @@ def _triage_all_backends(self, *, max_messages: int) -> dict: ``mailbox`` tag and its id is remembered for downstream action routing. """ from gaia_agent_email.tools import read_tools - from gaia_agent_email.tools.read_tools import triage_inbox_impl + from gaia_agent_email.tools.read_tools import ( + extract_sender_email, + triage_inbox_impl, + ) from gaia_agent_email.tools.triage_heuristics import group_by_category # Reference the factory via the read_tools module so the existing @@ -490,6 +501,14 @@ def _triage_all_backends(self, *, max_messages: int) -> dict: # Thread ids share the provenance map so get_thread / # summarize_thread route to the right mailbox too. self._remember_message_mailbox(item.get("thread_id"), provider) + # Record interaction for inbox profiling (#1289). Memory-guarded + # inside _record_interaction — silently skips when disabled. + # Recorded BEFORE the max_messages cap below on purpose: triage + # already classified this item, so its sender history is real + # even if the cap drops it from the returned view. + sender_addr = extract_sender_email(item.get("from", "")) + if sender_addr: + self._record_interaction(sender_addr, item.get("category", "")) merged.append(item) merged = merged[:max_messages] # Re-group the merged, capped list so the bucketed view matches what the diff --git a/hub/agents/python/email/gaia_agent_email/tools/preference_tools.py b/hub/agents/python/email/gaia_agent_email/tools/preference_tools.py index 9c5777eff..457727cfe 100644 --- a/hub/agents/python/email/gaia_agent_email/tools/preference_tools.py +++ b/hub/agents/python/email/gaia_agent_email/tools/preference_tools.py @@ -1,24 +1,24 @@ # Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. # SPDX-License-Identifier: MIT -"""Session-scoped preference tools mixin for ``EmailTriageAgent``. +"""Persistent preference tools mixin for ``EmailTriageAgent``. -These tools mutate ``self._session_preferences`` on the agent instance — -an in-memory dict that lives for the lifetime of the agent and is wiped -on restart. The deliberate scoping keeps the daily-driver demo focused -on proving the value before investing in a persistent memory subsystem; -once the broader memory work lands, persisting these preferences is a -direct upgrade path. +These tools mutate ``self._session_preferences`` on the agent instance and +persist the current snapshot to the agent's MemoryStore so that preferences +survive across restarts. On agent construction, ``_load_persisted_preferences`` +seeds ``_session_preferences`` from the stored snapshot. + +When memory is disabled (``self._memory_store is None``) the tools still work +in-process — they just cannot persist between sessions. Tools registered: - ``set_priority_sender(email)`` — flag a sender as always urgent - ``set_low_priority_sender(email)`` — flag a sender as always low-priority - ``set_category_default(category, action)`` — per-category default action -- ``clear_session_preferences()`` — wipe in-process preferences +- ``clear_session_preferences()`` — wipe preferences (in-process and persisted) The first three tools are consulted by ``triage_inbox`` and -``pre_scan_inbox`` (see ``read_tools.py``). ``clear_session_preferences`` -exists so the user can reset without restarting the agent. +``pre_scan_inbox`` (see ``read_tools.py``). """ from __future__ import annotations @@ -26,15 +26,23 @@ import json from typing import Any, Dict -from gaia.agents.base.tools import tool from gaia_agent_email.tools.triage_heuristics import ( CATEGORY_INFORMATIONAL, CATEGORY_LOW_PRIORITY, ) + +from gaia.agents.base.tools import tool from gaia.logger import get_logger log = get_logger(__name__) +# Stable entity key used to store the single preferences record in MemoryStore. +# Using a unique entity means get_by_entity() always returns at most one record, +# giving us a clean upsert path: retrieve → update(id) if exists, store() if not. +_PREF_ENTITY = "email:preferences" +_PREF_DOMAIN = "email_agent_prefs" +_PREF_CATEGORY = "preference" + # Categories that accept a session-level default action. Keep this set # small on purpose — defaulting "urgent" or "actionable" to "archive" @@ -103,6 +111,49 @@ def _snapshot(prefs: Dict[str, Any]) -> Dict[str, Any]: } +def _persist_preferences(agent: Any) -> None: + """Write the current snapshot to MemoryStore under a stable entity key. + + Uses an idempotent upsert: + - If a record already exists for ``_PREF_ENTITY``, update it in-place + (``store.update(id, content=...)``) so the record count stays at one. + - If no record exists yet, create it with ``store.store(...)``. + + When ``agent._memory_store is None`` (memory disabled via + ``GAIA_MEMORY_DISABLED=1`` or Lemonade unreachable at startup), + the write is silently skipped — preferences remain in-process only. + This is an explicit opt-out / degraded state, not a generic fallback. + + When the agent is in incognito mode (``agent._incognito is True``), + the write is also skipped — incognito sessions never write to persistent + storage, matching the MemoryMixin invariant. + """ + store = getattr(agent, "_memory_store", None) + if store is None or getattr(agent, "_incognito", False): + return + + prefs = getattr(agent, "_session_preferences", None) + if prefs is None: + return + + content = json.dumps(_snapshot(prefs)) + context = getattr(agent, "_memory_context", "email") + + existing = store.get_by_entity(_PREF_ENTITY) + if existing: + store.update(existing[0]["id"], content=content) + else: + store.store( + category=_PREF_CATEGORY, + content=content, + domain=_PREF_DOMAIN, + entity=_PREF_ENTITY, + context=context, + confidence=1.0, + source="preference_tools", + ) + + class PreferenceToolsMixin: """Mixin that registers session-preference tools. @@ -111,12 +162,49 @@ class PreferenceToolsMixin: via a closure over the agent instance. """ + def _load_persisted_preferences(self) -> None: + """Seed ``_session_preferences`` from the persisted memory record. + + Called from ``EmailTriageAgent.__init__`` after ``init_memory()`` so + that preferences set in a previous session are immediately available. + + When no record exists (first run or after ``clear_session_preferences`` + wiped everything) or when memory is disabled, the empty default set by + ``init_session_preferences()`` is left untouched. + """ + store = getattr(self, "_memory_store", None) + if store is None: + return + + existing = store.get_by_entity(_PREF_ENTITY) + if not existing: + return + + try: + data = json.loads(existing[0]["content"]) + except (json.JSONDecodeError, KeyError, TypeError): + log.warning( + "preference_tools: failed to parse persisted preferences; " + "starting with empty defaults" + ) + return + + prefs = getattr(self, "_session_preferences", None) + if prefs is None: + return + + _validate_session_preferences(prefs) + # lists → sets for the two sender fields + prefs["priority_senders"] = set(data.get("priority_senders") or []) + prefs["low_priority_senders"] = set(data.get("low_priority_senders") or []) + prefs["category_defaults"] = dict(data.get("category_defaults") or {}) + def _register_preference_tools(self) -> None: agent = self # captured for live access to ``_session_preferences`` @tool def set_priority_sender(email: str) -> str: - """Mark a sender as always urgent for this session. + """Mark a sender as always urgent across sessions. Senders flagged here bypass the triage heuristic entirely — ``triage_inbox`` and ``pre_scan_inbox`` will classify their @@ -124,7 +212,7 @@ def set_priority_sender(email: str) -> str: Gmail labels. Useful for high-signal senders the heuristic can't recognize on its own (e.g. ``boss@company.com``). - **Session-scoped — preferences are wiped on agent restart.** + Preferences persist across agent restarts. Args: email: A bare email address, e.g. ``alice@example.com``. @@ -145,6 +233,7 @@ def set_priority_sender(email: str) -> str: # priority designation supersedes — silently drop the # contradicting flag. prefs["low_priority_senders"].discard(normalized) + _persist_preferences(agent) return _envelope_ok( { "added": normalized, @@ -157,14 +246,14 @@ def set_priority_sender(email: str) -> str: @tool def set_low_priority_sender(email: str) -> str: - """Mark a sender as always low-priority for this session. + """Mark a sender as always low-priority across sessions. Senders flagged here are classified as ``low priority`` and surfaced in ``pre_scan_inbox``'s ``suggested_archives`` section. Useful for newsletters or bot accounts the heuristic can't recognize on its own. - **Session-scoped — preferences are wiped on agent restart.** + Preferences persist across agent restarts. Args: email: A bare email address, e.g. @@ -183,6 +272,7 @@ def set_low_priority_sender(email: str) -> str: # Same conflict resolution as set_priority_sender — # later wins. prefs["priority_senders"].discard(normalized) + _persist_preferences(agent) return _envelope_ok( { "added": normalized, @@ -195,7 +285,7 @@ def set_low_priority_sender(email: str) -> str: @tool def set_category_default(category: str, action: str) -> str: - """Set a default action for a triage category. + """Set a default action for a triage category, persisted across restarts. Currently supports two categories — ``informational`` and ``low priority`` — with two possible actions: ``archive`` @@ -205,7 +295,7 @@ def set_category_default(category: str, action: str) -> str: ``keep``: the safety cost of silently archiving important mail is too high. - **Session-scoped — preferences are wiped on agent restart.** + Preferences persist across agent restarts. Args: category: One of ``"informational"`` or ``"low priority"``. @@ -232,6 +322,7 @@ def set_category_default(category: str, action: str) -> str: prefs["category_defaults"].pop(cat, None) else: prefs["category_defaults"][cat] = act + _persist_preferences(agent) return _envelope_ok( { "category": cat, @@ -245,11 +336,12 @@ def set_category_default(category: str, action: str) -> str: @tool def clear_session_preferences() -> str: - """Wipe in-process session preferences. + """Wipe preferences in-process and from persistent storage. Resets ``priority_senders``, ``low_priority_senders``, and ``category_defaults`` to empty without restarting the agent. - Use when the user wants a fresh triage run with no overrides. + The cleared state is also persisted so a fresh session starts + empty. Use when the user wants a clean slate. Mutates the existing dict in place rather than rebinding to a fresh one. Read-side tools currently look up the dict via @@ -266,6 +358,7 @@ def clear_session_preferences() -> str: prefs["priority_senders"].clear() prefs["low_priority_senders"].clear() prefs["category_defaults"].clear() + _persist_preferences(agent) return _envelope_ok( { "cleared": True, diff --git a/hub/agents/python/email/gaia_agent_email/tools/profile_tools.py b/hub/agents/python/email/gaia_agent_email/tools/profile_tools.py new file mode 100644 index 000000000..64ed8736a --- /dev/null +++ b/hub/agents/python/email/gaia_agent_email/tools/profile_tools.py @@ -0,0 +1,231 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT +"""Inbox-profiling tools mixin for ``EmailTriageAgent``. + +Builds a per-sender frequency profile from remembered interaction history +so the agent can tell the user which senders write most often and in what +category. + +Interaction records are stored in MemoryStore as one rolling per-sender +record (entity ``email:interaction:``). Each record's ``content`` +is a JSON object:: + + { + "sender": "alice@example.com", + "count": 7, + "category_counts": {"urgent": 3, "actionable": 4}, + "last_ts": "2026-06-12T14:23:00+00:00" + } + +Bounded and idempotent by design — ``_record_interaction`` always does an +upsert into the single per-sender record, so the record count is O(senders) +regardless of how many emails arrive. #1290 can reuse ``_read_interactions`` +directly to build richer views. + +Tools registered: + +- ``profile_inbox()`` — reads all interaction records and returns a profile: + per-sender frequency + dominant category, sorted by volume descending. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Dict, List + +from gaia.agents.base.tools import tool +from gaia.logger import get_logger + +log = get_logger(__name__) + +# Stable entity prefix for per-sender interaction records. +# Entity = f"{_INTERACTION_ENTITY_PREFIX}{sender_email}" +# This prefix is intentionally namespace-safe (colon-separated, no slash) +# so that ``get_by_entity`` prefix lookups work consistently. +_INTERACTION_ENTITY_PREFIX = "email:interaction:" +_INTERACTION_DOMAIN = "email_agent_interactions" +_INTERACTION_CATEGORY = "interaction" + +# Sanity ceiling on how many distinct-sender interaction records we read in +# one profiling pass. This is NOT a silent truncation cap: if a real inbox +# ever has more distinct senders than this, ``_read_interactions`` logs a +# WARNING so the coverage loss is loud (per the repo's no-silent-fallbacks +# rule) and the ceiling can be raised deliberately. 50k senders is far beyond +# any realistic single mailbox, so in practice it is never hit. +_MAX_INTERACTION_RECORDS = 50000 + + +def _envelope_ok(data: Any) -> str: + return json.dumps({"ok": True, "data": data}, default=str) + + +def _envelope_err(message: str) -> str: + return json.dumps({"ok": False, "error": message}) + + +def _now_iso() -> str: + return datetime.now(tz=timezone.utc).isoformat() + + +def _dominant_category(category_counts: Dict[str, int]) -> str: + """Return the category with the highest count; ties broken by last alphabetically.""" + if not category_counts: + return "" + return max(category_counts, key=lambda cat: (category_counts[cat], cat)) + + +class ProfileToolsMixin: + """Mixin that registers inbox-profiling tools. + + State-free at construction time — reads ``self._memory_store`` via + a closure captured when ``_register_profile_tools()`` is called. + """ + + def _record_interaction(self, sender: str, category: str) -> None: + """Update the rolling interaction record for *sender*. + + - When ``_memory_store`` is None (memory disabled), silently skips. + - Does an upsert: retrieve the single per-sender record, update JSON + in-place, then call ``store.update(id, content=...)``. When no record + exists yet, ``store.store(...)`` creates the initial one. + - One record per sender — no unbounded accumulation. + """ + store = getattr(self, "_memory_store", None) + if store is None: + return + if not sender or not category: + return + + entity = f"{_INTERACTION_ENTITY_PREFIX}{sender}" + context = getattr(self, "_memory_context", "email") + now = _now_iso() + + existing = store.get_by_entity(entity) + if existing: + row = existing[0] + try: + payload = json.loads(row["content"]) + except (json.JSONDecodeError, KeyError): + payload = { + "sender": sender, + "count": 0, + "category_counts": {}, + "last_ts": now, + } + payload["count"] = int(payload.get("count", 0)) + 1 + cats = dict(payload.get("category_counts") or {}) + cats[category] = int(cats.get(category, 0)) + 1 + payload["category_counts"] = cats + payload["last_ts"] = now + store.update(row["id"], content=json.dumps(payload)) + else: + payload = { + "sender": sender, + "count": 1, + "category_counts": {category: 1}, + "last_ts": now, + } + store.store( + category=_INTERACTION_CATEGORY, + content=json.dumps(payload), + domain=_INTERACTION_DOMAIN, + entity=entity, + context=context, + confidence=1.0, + source="profile_tools", + ) + + def _read_interactions(self) -> List[Dict[str, Any]]: + """Return all per-sender interaction records as parsed dicts. + + Each element has: ``sender``, ``count``, ``category_counts``, + ``last_ts``. Returns an empty list when memory is disabled or no + records exist. Designed for reuse by #1290 and other callers. + + Reads up to ``_MAX_INTERACTION_RECORDS`` distinct-sender records. That + ceiling is far beyond any realistic mailbox; if it is ever reached we + log a WARNING (never silently drop coverage) so it can be raised + deliberately. + """ + store = getattr(self, "_memory_store", None) + if store is None: + return [] + rows = store.get_by_category( + _INTERACTION_CATEGORY, + domain=_INTERACTION_DOMAIN, + limit=_MAX_INTERACTION_RECORDS, + ) + if len(rows) >= _MAX_INTERACTION_RECORDS: + log.warning( + "profile_tools: interaction-record read hit the %d-record " + "ceiling; the inbox profile may be incomplete. Raise " + "_MAX_INTERACTION_RECORDS to cover all senders.", + _MAX_INTERACTION_RECORDS, + ) + out: List[Dict[str, Any]] = [] + for row in rows: + try: + payload = json.loads(row["content"]) + # Ensure required keys are present + out.append( + { + "sender": payload.get("sender", ""), + "count": int(payload.get("count", 0)), + "category_counts": dict(payload.get("category_counts") or {}), + "last_ts": payload.get("last_ts", ""), + } + ) + except (json.JSONDecodeError, KeyError, TypeError): + log.warning( + "profile_tools: skipping malformed interaction record %s", + row.get("id"), + ) + return out + + def _register_profile_tools(self) -> None: + agent = self # closure over the agent for live access to _memory_store + + @tool + def profile_inbox() -> str: + """Summarize sender frequency and typical category from interaction history. + + Reads all remembered interaction records built up as the agent + has triaged messages. Returns a profile ranked by message volume + (highest first), with per-sender total count and dominant + category. + + Returns: + JSON envelope with ``{"ok": true, "data": {"top_senders": [...], + "total_messages": N}}`` where each top-senders element has + ``sender``, ``count``, ``dominant_category``, + ``category_counts``, and ``last_ts`` (ISO-8601 timestamp of + the most recent interaction). Returns an empty profile + (ok=True, top_senders=[]) when memory is disabled or no + history exists. + """ + try: + records = agent._read_interactions() + if not records: + return _envelope_ok({"top_senders": [], "total_messages": 0}) + # Sort descending by count, then alphabetically for stable output. + sorted_records = sorted( + records, key=lambda r: (-r["count"], r["sender"]) + ) + top_senders = [ + { + "sender": r["sender"], + "count": r["count"], + "dominant_category": _dominant_category(r["category_counts"]), + "category_counts": r["category_counts"], + "last_ts": r["last_ts"], + } + for r in sorted_records + ] + total = sum(r["count"] for r in records) + return _envelope_ok( + {"top_senders": top_senders, "total_messages": total} + ) + except Exception as exc: + log.exception("profile_inbox failed: %s", type(exc).__name__) + return _envelope_err(f"{type(exc).__name__}: {exc}") diff --git a/hub/agents/python/email/tests/test_email_inbox_profiling.py b/hub/agents/python/email/tests/test_email_inbox_profiling.py new file mode 100644 index 000000000..7075f5926 --- /dev/null +++ b/hub/agents/python/email/tests/test_email_inbox_profiling.py @@ -0,0 +1,431 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT +""" +Inbox-profiling tests for EmailTriageAgent (#1289). + +Acceptance criteria covered: +- AC (main): A ``profile_inbox`` capability summarizes sender/category patterns + from remembered interaction history. +- Test-AC: Profile reflects seeded historical interactions in a fixture inbox. + +Additional test coverage: +- record-on-triage: triaging one message writes one interaction record. +- memory-disabled: ``profile_inbox`` returns a clean empty/disabled result, + no error. +- bounding/idempotency: recording the same sender N times keeps a single + rolling record (no unbounded rows). + +Embedder is mocked (same pattern as test_email_memory.py) so tests run +hermetically without Lemonade. +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +# --------------------------------------------------------------------------- +# Path / import bootstrap +# --------------------------------------------------------------------------- + +# parents[0] = tests/, [1] = email/, [2] = python/, [3] = agents/, +# [4] = hub/, [5] = repo-root +_REPO_ROOT = Path(__file__).resolve().parents[5] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +pytest.importorskip("gaia_agent_email") + +from gaia_agent_email.agent import EmailTriageAgent # noqa: E402 +from gaia_agent_email.config import EmailAgentConfig # noqa: E402 + +# --------------------------------------------------------------------------- +# Minimal fake backends +# --------------------------------------------------------------------------- + + +class _MinimalMailBackend: + pass + + +class _MinimalCalendarBackend: + pass + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +EMBEDDING_DIM = 768 + +_INTERACTION_ENTITY_PREFIX = "email:interaction:" +_INTERACTION_DOMAIN = "email_agent_interactions" +_INTERACTION_CATEGORY = "interaction" + + +def _fake_embed(text: str) -> np.ndarray: + """Deterministic unit vector — keeps FAISS happy.""" + vec = np.ones(EMBEDDING_DIM, dtype=np.float32) + vec /= np.linalg.norm(vec) + return vec + + +def _build_agent(tmp_path: Path, *, memory_disabled: bool = False) -> EmailTriageAgent: + """Build EmailTriageAgent with injected fakes and tmp db paths. + + When *memory_disabled* is True sets GAIA_MEMORY_DISABLED=1 before + construction and restores the env var after. Otherwise the Lemonade + embedder is mocked so init_memory succeeds without a running server. + """ + cfg = EmailAgentConfig( + gmail_backend=_MinimalMailBackend(), + calendar_backend=_MinimalCalendarBackend(), + db_path=str(tmp_path / "state.db"), + memory_db_path=str(tmp_path / "memory.db"), + silent_mode=True, + debug=False, + ) + + def _do_build(): + with patch("gaia.agents.base.agent.AgentSDK") as mock_sdk: + mock_sdk.return_value = MagicMock() + return EmailTriageAgent(config=cfg) + + if memory_disabled: + old = os.environ.get("GAIA_MEMORY_DISABLED") + os.environ["GAIA_MEMORY_DISABLED"] = "1" + try: + return _do_build() + finally: + if old is None: + del os.environ["GAIA_MEMORY_DISABLED"] + else: + os.environ["GAIA_MEMORY_DISABLED"] = old + else: + with ( + patch( + "gaia.agents.base.memory.MemoryMixin._get_embedder", + return_value=MagicMock(), + ), + patch( + "gaia.agents.base.memory.MemoryMixin._embed_text", + side_effect=_fake_embed, + ), + patch( + "gaia.agents.base.memory.MemoryMixin._backfill_embeddings", + return_value=0, + ), + patch( + "gaia.agents.base.memory.MemoryMixin._rebuild_faiss_index", + ), + patch( + "gaia.agents.base.memory.MemoryMixin.init_system_context", + ), + ): + return _do_build() + + +def _invoke_profile_inbox(agent: EmailTriageAgent) -> dict: + """Call the profile_inbox tool directly via the tool registry.""" + from gaia.agents.base.tools import _TOOL_REGISTRY + + entry = _TOOL_REGISTRY.get("profile_inbox") + assert entry is not None, "profile_inbox tool not registered" + result = entry["function"]() + return json.loads(result) + + +def _seed_interaction( + agent: EmailTriageAgent, + sender: str, + category: str, + count: int = 1, +) -> None: + """Directly call the private _record_interaction helper N times to seed history.""" + for _ in range(count): + agent._record_interaction(sender, category) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestProfileInboxToolRegistered: + """AC: profile_inbox capability is registered after construction.""" + + def test_profile_inbox_in_registry(self, tmp_path): + """profile_inbox appears in the tool registry after construction.""" + from gaia.agents.base.tools import _TOOL_REGISTRY + + agent = _build_agent(tmp_path) + try: + assert ( + "profile_inbox" in _TOOL_REGISTRY + ), f"profile_inbox not in registry. Keys: {sorted(_TOOL_REGISTRY)}" + finally: + agent.close_db() + + def test_profile_inbox_alongside_other_tools(self, tmp_path): + """profile_inbox coexists with existing email tools.""" + from gaia.agents.base.tools import _TOOL_REGISTRY + + agent = _build_agent(tmp_path) + try: + assert "list_inbox" in _TOOL_REGISTRY + assert "profile_inbox" in _TOOL_REGISTRY + finally: + agent.close_db() + + +class TestProfileReflectsSeededHistory: + """Test-AC: profile reflects seeded historical interactions.""" + + def test_seeded_sender_appears_in_profile(self, tmp_path): + """After seeding 3 interactions for a sender, profile reports count=3.""" + agent = _build_agent(tmp_path) + try: + _seed_interaction(agent, "boss@company.com", "urgent", count=3) + result = _invoke_profile_inbox(agent) + assert result["ok"] is True, f"profile_inbox failed: {result}" + + data = result["data"] + senders = {s["sender"]: s for s in data["top_senders"]} + assert ( + "boss@company.com" in senders + ), f"boss@company.com not in top_senders: {data['top_senders']}" + assert ( + senders["boss@company.com"]["count"] == 3 + ), f"Expected count=3, got: {senders['boss@company.com']}" + finally: + agent.close_db() + + def test_dominant_category_correct(self, tmp_path): + """Dominant category for a sender matches the most-seen category.""" + agent = _build_agent(tmp_path) + try: + _seed_interaction(agent, "newsletter@example.com", "low priority", count=5) + _seed_interaction(agent, "newsletter@example.com", "informational", count=2) + result = _invoke_profile_inbox(agent) + assert result["ok"] is True + + senders = {s["sender"]: s for s in result["data"]["top_senders"]} + assert "newsletter@example.com" in senders + assert ( + senders["newsletter@example.com"]["dominant_category"] == "low priority" + ), f"Expected dominant_category=low priority: {senders['newsletter@example.com']}" + finally: + agent.close_db() + + def test_multiple_senders_all_present(self, tmp_path): + """All seeded senders appear in the profile.""" + agent = _build_agent(tmp_path) + try: + _seed_interaction(agent, "alice@company.com", "urgent", count=2) + _seed_interaction(agent, "bob@company.com", "actionable", count=1) + _seed_interaction(agent, "newsletter@news.com", "low priority", count=4) + result = _invoke_profile_inbox(agent) + assert result["ok"] is True + + present = {s["sender"] for s in result["data"]["top_senders"]} + assert "alice@company.com" in present + assert "bob@company.com" in present + assert "newsletter@news.com" in present + finally: + agent.close_db() + + def test_empty_profile_on_no_history(self, tmp_path): + """profile_inbox with no interactions returns ok with empty top_senders.""" + agent = _build_agent(tmp_path) + try: + result = _invoke_profile_inbox(agent) + assert result["ok"] is True + assert ( + result["data"]["top_senders"] == [] + ), f"Expected empty top_senders, got: {result['data']['top_senders']}" + finally: + agent.close_db() + + def test_profile_total_messages_correct(self, tmp_path): + """total_messages in profile equals sum of all interaction counts.""" + agent = _build_agent(tmp_path) + try: + _seed_interaction(agent, "alice@company.com", "urgent", count=3) + _seed_interaction(agent, "bob@company.com", "informational", count=2) + result = _invoke_profile_inbox(agent) + assert result["ok"] is True + assert ( + result["data"]["total_messages"] == 5 + ), f"Expected total_messages=5: {result['data']}" + finally: + agent.close_db() + + +class TestRecordOnTriage: + """record-on-triage: calling triage writes interaction records.""" + + def test_record_interaction_creates_memory_record(self, tmp_path): + """_record_interaction for a sender creates exactly one memory record.""" + agent = _build_agent(tmp_path) + try: + agent._record_interaction("boss@company.com", "urgent") + store = agent._memory_store + assert store is not None + + entity = f"{_INTERACTION_ENTITY_PREFIX}boss@company.com" + rows = store.get_by_entity(entity) + assert ( + len(rows) == 1 + ), f"Expected 1 interaction record, got {len(rows)}: {rows}" + payload = json.loads(rows[0]["content"]) + assert payload["sender"] == "boss@company.com" + assert payload["count"] == 1 + assert payload["category_counts"]["urgent"] == 1 + finally: + agent.close_db() + + def test_record_interaction_increments_count(self, tmp_path): + """Recording the same sender twice increments count to 2 in the same record.""" + agent = _build_agent(tmp_path) + try: + agent._record_interaction("alice@company.com", "actionable") + agent._record_interaction("alice@company.com", "actionable") + + entity = f"{_INTERACTION_ENTITY_PREFIX}alice@company.com" + rows = agent._memory_store.get_by_entity(entity) + assert len(rows) == 1, f"Expected 1 record, got {len(rows)}" + payload = json.loads(rows[0]["content"]) + assert payload["count"] == 2 + assert payload["category_counts"]["actionable"] == 2 + finally: + agent.close_db() + + def test_record_interaction_tracks_multiple_categories(self, tmp_path): + """Different categories for the same sender are tracked separately.""" + agent = _build_agent(tmp_path) + try: + agent._record_interaction("mixed@company.com", "urgent") + agent._record_interaction("mixed@company.com", "informational") + agent._record_interaction("mixed@company.com", "urgent") + + entity = f"{_INTERACTION_ENTITY_PREFIX}mixed@company.com" + rows = agent._memory_store.get_by_entity(entity) + assert len(rows) == 1 + payload = json.loads(rows[0]["content"]) + assert payload["count"] == 3 + assert payload["category_counts"]["urgent"] == 2 + assert payload["category_counts"]["informational"] == 1 + finally: + agent.close_db() + + +class TestMemoryDisabled: + """memory-disabled: profile_inbox returns clean result, no error.""" + + def test_profile_inbox_memory_disabled_ok(self, tmp_path): + """When GAIA_MEMORY_DISABLED=1, profile_inbox returns ok with empty profile.""" + agent = _build_agent(tmp_path, memory_disabled=True) + try: + assert agent._memory_store is None + result = _invoke_profile_inbox(agent) + assert ( + result["ok"] is True + ), f"Expected ok=True when memory disabled, got: {result}" + data = result["data"] + assert data["top_senders"] == [] + assert data["total_messages"] == 0 + finally: + agent.close_db() + + def test_record_interaction_memory_disabled_no_error(self, tmp_path): + """_record_interaction silently skips when memory is disabled (no exception).""" + agent = _build_agent(tmp_path, memory_disabled=True) + try: + assert agent._memory_store is None + # Must not raise + agent._record_interaction("someone@example.com", "urgent") + finally: + agent.close_db() + + +class TestBoundingAndIdempotency: + """Bounding/idempotency: one rolling record per sender, no unbounded accumulation.""" + + def test_n_recordings_keep_single_record(self, tmp_path): + """Recording the same sender 10 times produces exactly 1 memory record.""" + agent = _build_agent(tmp_path) + try: + for _ in range(10): + agent._record_interaction("repeat@company.com", "informational") + + entity = f"{_INTERACTION_ENTITY_PREFIX}repeat@company.com" + rows = agent._memory_store.get_by_entity(entity) + assert ( + len(rows) == 1 + ), f"Expected exactly 1 record after 10 recordings, got {len(rows)}: {rows}" + payload = json.loads(rows[0]["content"]) + assert ( + payload["count"] == 10 + ), f"count should be 10, got: {payload['count']}" + finally: + agent.close_db() + + def test_different_senders_each_get_own_record(self, tmp_path): + """Three different senders produce three distinct records.""" + agent = _build_agent(tmp_path) + try: + agent._record_interaction("a@company.com", "urgent") + agent._record_interaction("b@company.com", "actionable") + agent._record_interaction("c@company.com", "informational") + + for sender in ("a@company.com", "b@company.com", "c@company.com"): + entity = f"{_INTERACTION_ENTITY_PREFIX}{sender}" + rows = agent._memory_store.get_by_entity(entity) + assert ( + len(rows) == 1 + ), f"Expected 1 record for {sender}, got {len(rows)}" + finally: + agent.close_db() + + def test_interactions_persist_across_restart(self, tmp_path): + """Interaction records survive an agent restart (same memory.db path).""" + agent_a = _build_agent(tmp_path) + try: + agent_a._record_interaction("persist@company.com", "urgent") + agent_a._record_interaction("persist@company.com", "urgent") + finally: + agent_a.close_db() + + # Re-open the same memory DB directly and verify the record survived. + from gaia.agents.base.memory_store import MemoryStore + + store = MemoryStore(tmp_path / "memory.db") + entity = f"{_INTERACTION_ENTITY_PREFIX}persist@company.com" + rows = store.get_by_entity(entity) + assert len(rows) == 1, f"Expected 1 record after restart, got {len(rows)}" + payload = json.loads(rows[0]["content"]) + assert payload["count"] == 2 + + def test_read_interactions_returns_all_senders(self, tmp_path): + """_read_interactions() returns all seeded sender records.""" + agent = _build_agent(tmp_path) + try: + agent._record_interaction("x@company.com", "urgent") + agent._record_interaction("y@company.com", "low priority") + agent._record_interaction("z@company.com", "informational") + + records = agent._read_interactions() + senders = {r["sender"] for r in records} + assert senders == { + "x@company.com", + "y@company.com", + "z@company.com", + }, f"Expected 3 senders, got: {senders}" + finally: + agent.close_db() diff --git a/hub/agents/python/email/tests/test_email_preferences_persist.py b/hub/agents/python/email/tests/test_email_preferences_persist.py new file mode 100644 index 000000000..e90d1c830 --- /dev/null +++ b/hub/agents/python/email/tests/test_email_preferences_persist.py @@ -0,0 +1,415 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT +""" +Cross-session preference persistence tests for EmailTriageAgent. + +Acceptance criteria covered: +- AC (main): priority/low-priority sender set in one session is present in a fresh session. +- AC (main): category-default action persists across restarts. +- Test-AC: priority sender set in session A is in _session_preferences in session B. +- Test-AC: clear_session_preferences clears persistence so a new session starts empty. + +Embedder is mocked out (same pattern as test_email_memory.py) so tests run +hermetically without Lemonade. GAIA_MEMORY_DISABLED=1 is NOT used here because +we need _memory_store to be set for preference persistence to work. +""" + +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +# --------------------------------------------------------------------------- +# Path / import bootstrap +# --------------------------------------------------------------------------- + +# parents[0] = tests/, [1] = email/, [2] = python/, [3] = agents/, +# [4] = hub/, [5] = repo-root +_REPO_ROOT = Path(__file__).resolve().parents[5] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +pytest.importorskip("gaia_agent_email") + +from gaia_agent_email.agent import EmailTriageAgent # noqa: E402 +from gaia_agent_email.config import EmailAgentConfig # noqa: E402 + +# --------------------------------------------------------------------------- +# Minimal fake backends +# --------------------------------------------------------------------------- + + +class _MinimalMailBackend: + pass + + +class _MinimalCalendarBackend: + pass + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +EMBEDDING_DIM = 768 + +_PREF_ENTITY = "email:preferences" +_PREF_DOMAIN = "email_agent_prefs" +_PREF_CATEGORY = "preference" + + +def _fake_embed(text: str) -> np.ndarray: + """Deterministic unit vector — keeps FAISS happy.""" + vec = np.ones(EMBEDDING_DIM, dtype=np.float32) + vec /= np.linalg.norm(vec) + return vec + + +def _build_agent(tmp_path: Path) -> EmailTriageAgent: + """Build EmailTriageAgent with injected fakes and tmp db paths. + + Mocks the Lemonade embedding endpoint so init_memory succeeds + without a running Lemonade server (FTS5-only store/search path). + """ + cfg = EmailAgentConfig( + gmail_backend=_MinimalMailBackend(), + calendar_backend=_MinimalCalendarBackend(), + db_path=str(tmp_path / "state.db"), + memory_db_path=str(tmp_path / "memory.db"), + silent_mode=True, + debug=False, + ) + + with ( + patch("gaia.agents.base.agent.AgentSDK") as mock_sdk, + patch( + "gaia.agents.base.memory.MemoryMixin._get_embedder", + return_value=MagicMock(), + ), + patch( + "gaia.agents.base.memory.MemoryMixin._embed_text", + side_effect=_fake_embed, + ), + patch( + "gaia.agents.base.memory.MemoryMixin._backfill_embeddings", + return_value=0, + ), + patch( + "gaia.agents.base.memory.MemoryMixin._rebuild_faiss_index", + ), + patch( + "gaia.agents.base.memory.MemoryMixin.init_system_context", + ), + ): + mock_sdk.return_value = MagicMock() + return EmailTriageAgent(config=cfg) + + +def _invoke_set_priority_sender(email: str) -> dict: + """Call the set_priority_sender tool directly via the tool registry.""" + from gaia.agents.base.tools import _TOOL_REGISTRY + + entry = _TOOL_REGISTRY.get("set_priority_sender") + assert entry is not None, "set_priority_sender not registered" + result = entry["function"](email) + return json.loads(result) + + +def _invoke_set_low_priority_sender(email: str) -> dict: + from gaia.agents.base.tools import _TOOL_REGISTRY + + entry = _TOOL_REGISTRY.get("set_low_priority_sender") + assert entry is not None, "set_low_priority_sender not registered" + result = entry["function"](email) + return json.loads(result) + + +def _invoke_set_category_default(category: str, action: str) -> dict: + from gaia.agents.base.tools import _TOOL_REGISTRY + + entry = _TOOL_REGISTRY.get("set_category_default") + assert entry is not None, "set_category_default not registered" + result = entry["function"](category, action) + return json.loads(result) + + +def _invoke_clear_session_preferences() -> dict: + from gaia.agents.base.tools import _TOOL_REGISTRY + + entry = _TOOL_REGISTRY.get("clear_session_preferences") + assert entry is not None, "clear_session_preferences not registered" + result = entry["function"]() + return json.loads(result) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestPrioritySenderPersistsAcrossRestart: + """AC: priority sender set in session A is present in session B.""" + + def test_priority_sender_survives_restart(self, tmp_path): + """Set a priority sender in agent A; it must be in _session_preferences + of a freshly-constructed agent B using the same memory.db.""" + # Session A — set sender and close + agent_a = _build_agent(tmp_path) + try: + result = _invoke_set_priority_sender("boss@company.com") + assert result["ok"] is True, f"set_priority_sender failed: {result}" + finally: + agent_a.close_db() + + # Session B — fresh instance, same db + agent_b = _build_agent(tmp_path) + try: + assert ( + "boss@company.com" in agent_b._session_preferences["priority_senders"] + ), ( + "priority sender not restored from memory after restart. " + f"Got: {agent_b._session_preferences['priority_senders']}" + ) + finally: + agent_b.close_db() + + def test_low_priority_sender_survives_restart(self, tmp_path): + """Set a low-priority sender in session A; it persists into session B.""" + agent_a = _build_agent(tmp_path) + try: + result = _invoke_set_low_priority_sender("newsletter@stripe.com") + assert result["ok"] is True, f"set_low_priority_sender failed: {result}" + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + assert ( + "newsletter@stripe.com" + in agent_b._session_preferences["low_priority_senders"] + ), ( + "low_priority sender not restored from memory after restart. " + f"Got: {agent_b._session_preferences['low_priority_senders']}" + ) + finally: + agent_b.close_db() + + def test_multiple_senders_survive_restart(self, tmp_path): + """Multiple priority and low-priority senders all persist.""" + agent_a = _build_agent(tmp_path) + try: + _invoke_set_priority_sender("boss@company.com") + _invoke_set_priority_sender("cto@company.com") + _invoke_set_low_priority_sender("news@example.com") + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + assert ( + "boss@company.com" in agent_b._session_preferences["priority_senders"] + ) + assert "cto@company.com" in agent_b._session_preferences["priority_senders"] + assert ( + "news@example.com" + in agent_b._session_preferences["low_priority_senders"] + ) + finally: + agent_b.close_db() + + def test_no_duplicate_memory_records(self, tmp_path): + """Writing preferences multiple times does not accumulate duplicate records.""" + agent_a = _build_agent(tmp_path) + try: + # Write the same sender several times + for _ in range(5): + _invoke_set_priority_sender("boss@company.com") + finally: + agent_a.close_db() + + # Re-open the memory store directly and count records for the entity + from gaia.agents.base.memory_store import MemoryStore + + store = MemoryStore(tmp_path / "memory.db") + rows = store.get_by_entity(_PREF_ENTITY) + assert ( + len(rows) == 1 + ), f"Expected exactly 1 memory record for {_PREF_ENTITY!r}, got {len(rows)}: {rows}" + + +class TestCategoryDefaultPersistsAcrossRestart: + """AC: category-default action persists across restarts.""" + + def test_category_default_survives_restart(self, tmp_path): + """Set informational→archive in session A; it's present in session B.""" + agent_a = _build_agent(tmp_path) + try: + result = _invoke_set_category_default("informational", "archive") + assert result["ok"] is True, f"set_category_default failed: {result}" + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + defaults = agent_b._session_preferences["category_defaults"] + assert ( + defaults.get("informational") == "archive" + ), f"category_default not restored. Got: {defaults}" + finally: + agent_b.close_db() + + def test_category_default_keep_clears_persisted_archive(self, tmp_path): + """Setting a category to 'keep' removes the archive preference on restart.""" + # Session A: set archive, then flip back to keep + agent_a = _build_agent(tmp_path) + try: + _invoke_set_category_default("informational", "archive") + _invoke_set_category_default("informational", "keep") + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + defaults = agent_b._session_preferences["category_defaults"] + assert "informational" not in defaults, ( + f"'keep' should remove informational from category_defaults, " + f"but got: {defaults}" + ) + finally: + agent_b.close_db() + + +class TestClearPersistenceAcrossRestart: + """AC: clear_session_preferences clears persistence so session B starts empty.""" + + def test_clear_wipes_persisted_preferences(self, tmp_path): + """Set sender + default, then clear; session B starts empty.""" + agent_a = _build_agent(tmp_path) + try: + _invoke_set_priority_sender("boss@company.com") + _invoke_set_category_default("informational", "archive") + result = _invoke_clear_session_preferences() + assert result["ok"] is True, f"clear_session_preferences failed: {result}" + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + prefs = agent_b._session_preferences + assert len(prefs["priority_senders"]) == 0, ( + f"priority_senders should be empty after clear+restart, " + f"got: {prefs['priority_senders']}" + ) + assert len(prefs["low_priority_senders"]) == 0, ( + f"low_priority_senders should be empty after clear+restart, " + f"got: {prefs['low_priority_senders']}" + ) + assert len(prefs["category_defaults"]) == 0, ( + f"category_defaults should be empty after clear+restart, " + f"got: {prefs['category_defaults']}" + ) + finally: + agent_b.close_db() + + +class TestIncognitoGate: + """When _incognito=True, preference mutations work in-process but are NOT persisted.""" + + def test_incognito_preferences_not_persisted(self, tmp_path): + """Setting a priority sender while incognito must NOT be written to the store. + + A subsequent non-incognito session must NOT see the incognito-session sender. + """ + # Session A — incognito; set a sender + agent_a = _build_agent(tmp_path) + try: + agent_a._incognito = True + result = _invoke_set_priority_sender("secret@example.com") + assert result["ok"] is True, f"set_priority_sender failed: {result}" + # In-process state is mutated even in incognito + assert ( + "secret@example.com" in agent_a._session_preferences["priority_senders"] + ) + finally: + agent_a.close_db() + + # Session B — non-incognito; sender must NOT be present + agent_b = _build_agent(tmp_path) + try: + assert ( + "secret@example.com" + not in agent_b._session_preferences["priority_senders"] + ), ( + "Incognito session must not persist preferences to the memory store. " + f"Got: {agent_b._session_preferences['priority_senders']}" + ) + finally: + agent_b.close_db() + + def test_non_incognito_preferences_are_persisted(self, tmp_path): + """Sanity: a normal (non-incognito) session DOES persist the preference.""" + agent_a = _build_agent(tmp_path) + try: + # _incognito defaults to False; explicitly confirm + agent_a._incognito = False + result = _invoke_set_priority_sender("visible@example.com") + assert result["ok"] is True, f"set_priority_sender failed: {result}" + finally: + agent_a.close_db() + + agent_b = _build_agent(tmp_path) + try: + assert ( + "visible@example.com" + in agent_b._session_preferences["priority_senders"] + ), ( + "Non-incognito session must persist preference to the memory store. " + f"Got: {agent_b._session_preferences['priority_senders']}" + ) + finally: + agent_b.close_db() + + +class TestMemoryDisabledFallback: + """When GAIA_MEMORY_DISABLED=1, preferences still work in-session but aren't persisted.""" + + def test_preferences_work_in_session_without_memory(self, tmp_path): + """GAIA_MEMORY_DISABLED=1 — preferences mutate in-memory but don't crash.""" + cfg = EmailAgentConfig( + gmail_backend=_MinimalMailBackend(), + calendar_backend=_MinimalCalendarBackend(), + db_path=str(tmp_path / "state.db"), + memory_db_path=str(tmp_path / "memory.db"), + silent_mode=True, + debug=False, + ) + + old = os.environ.get("GAIA_MEMORY_DISABLED") + os.environ["GAIA_MEMORY_DISABLED"] = "1" + try: + with patch("gaia.agents.base.agent.AgentSDK") as mock_sdk: + mock_sdk.return_value = MagicMock() + agent = EmailTriageAgent(config=cfg) + try: + # Memory is disabled — _memory_store should be None + assert agent._memory_store is None + + # Preference tools should still work (in-process mutation) + result = _invoke_set_priority_sender("boss@company.com") + assert result["ok"] is True, f"set_priority_sender failed: {result}" + assert ( + "boss@company.com" in agent._session_preferences["priority_senders"] + ) + finally: + agent.close_db() + finally: + if old is None: + del os.environ["GAIA_MEMORY_DISABLED"] + else: + os.environ["GAIA_MEMORY_DISABLED"] = old diff --git a/src/gaia/apps/webui/package-lock.json b/src/gaia/apps/webui/package-lock.json index 72e18373f..69ddcb120 100644 --- a/src/gaia/apps/webui/package-lock.json +++ b/src/gaia/apps/webui/package-lock.json @@ -1,12 +1,12 @@ { "name": "@amd-gaia/agent-ui", - "version": "0.21.0", + "version": "0.21.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@amd-gaia/agent-ui", - "version": "0.21.0", + "version": "0.21.1", "license": "MIT", "dependencies": { "electron-updater": "^6.8.3" diff --git a/src/gaia/apps/webui/package.json b/src/gaia/apps/webui/package.json index d29e459ef..f2dcb4c12 100644 --- a/src/gaia/apps/webui/package.json +++ b/src/gaia/apps/webui/package.json @@ -1,6 +1,6 @@ { "name": "@amd-gaia/agent-ui", - "version": "0.21.0", + "version": "0.21.1", "type": "module", "productName": "GAIA Agent UI", "description": "Privacy-first agentic AI interface with document Q&A - runs 100% locally on AMD Ryzen AI", diff --git a/src/gaia/apps/webui/src/App.tsx b/src/gaia/apps/webui/src/App.tsx index b060a7f68..441ef2eb5 100644 --- a/src/gaia/apps/webui/src/App.tsx +++ b/src/gaia/apps/webui/src/App.tsx @@ -80,6 +80,7 @@ function App() { setSystemStatus, setBackendConnected, setAgents, + setRunningSessions, } = useChatStore(); const showNotificationPanel = useNotificationStore((s) => s.showPanel); const setShowNotificationPanel = useNotificationStore((s) => s.setShowPanel); @@ -270,6 +271,25 @@ function App() { }; }, [setSessions, addSession, removeSession, updateSessionInList, setBackendConnected]); + // Poll which sessions have a running turn so the sidebar can show a + // "still running" spinner on backgrounded runs. Backend-truth + // (/api/chat/active), independent of any open SSE stream (#1580). + const activeRunsPollRef = useRef | null>(null); + useEffect(() => { + const poll = () => { + api.getActiveRuns() + .then((data) => setRunningSessions(data.session_ids || [])) + .catch(() => { /* non-critical — sidebar just won't show spinners */ }); + }; + poll(); + // 2.6s (off the :00/:30 marks) — responsive enough to feel live without + // hammering the backend. + activeRunsPollRef.current = setInterval(poll, 2_600); + return () => { + if (activeRunsPollRef.current) clearInterval(activeRunsPollRef.current); + }; + }, [setRunningSessions]); + // Support URL-based session navigation (?session= or #) useEffect(() => { if (currentSessionId) return; // Already have a session selected diff --git a/src/gaia/apps/webui/src/components/ChatView.tsx b/src/gaia/apps/webui/src/components/ChatView.tsx index 531fa7ef4..aac1447cd 100644 --- a/src/gaia/apps/webui/src/components/ChatView.tsx +++ b/src/gaia/apps/webui/src/components/ChatView.tsx @@ -280,7 +280,7 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr const abortRef = useRef(null); const stepIdRef = useRef(0); const toolOccurredRef = useRef(false); - const sendMessageRef = useRef<(text?: string) => void>(() => {}); + const sendMessageRef = useRef<(text?: string, options?: { attach?: boolean }) => void>(() => {}); // ── Streaming chunk buffer ────────────────────────────────────── // Buffer SSE chunks in a ref and flush to the store via rAF. @@ -471,6 +471,12 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr // Stop streaming — reads fresh state from store to avoid stale closures const handleStop = useCallback(() => { log.stream.warn('User stopped generation'); + // Tell the backend to cancel the run. Since runs now outlive the SSE + // connection (#1580), aborting the client alone only detaches us — the + // agent would keep generating in the background. The cancel endpoint + // sets the handler's cancelled flag so the producer bails at its next + // step boundary. + api.cancelStream(sessionId).catch(() => { /* best-effort */ }); if (abortRef.current) { abortRef.current.abort(); abortRef.current = null; @@ -632,7 +638,13 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr }, []); // Send message - const sendMessage = useCallback(async (overrideText?: string) => { + const sendMessage = useCallback(async (overrideText?: string, options?: { attach?: boolean }) => { + // attach=true re-subscribes to a run already in flight server-side + // (revisiting a backgrounded session, #1580). It reuses the entire + // stream-event handling below but skips composing/sending a new turn: + // no optimistic user message, no input/attachment handling, and the + // controller comes from api.attachToRun instead of api.sendMessageStream. + const attach = options?.attach === true; const text = (overrideText || input).trim(); const hasAttachments = attachments.length > 0 && attachments.some(a => a.uploaded); @@ -641,7 +653,10 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr isNearBottomRef.current = true; const isInitializing = systemStatus?.init_state === 'initializing'; - if ((!text && !hasAttachments) || isStreaming || isInitializing) { + if (attach) { + // Don't double-attach if a stream is already live in this view. + if (isStreaming) return; + } else if ((!text && !hasAttachments) || isStreaming || isInitializing) { if (!text && !hasAttachments) log.chat.debug('Send blocked: empty message'); if (isStreaming) log.chat.debug('Send blocked: already streaming'); if (isInitializing) log.chat.debug('Send blocked: system initializing'); @@ -650,43 +665,47 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr // Build message text with attachment references let messageText = text; - const uploadedAttachments = attachments.filter(a => a.uploaded && a.serverUrl); - if (uploadedAttachments.length > 0) { - const attachmentLines = uploadedAttachments.map(a => { - if (a.isImage) { - return `![${a.name}](${a.serverUrl})`; - } - return `[${a.name}](${a.serverUrl})`; - }).join('\n'); - messageText = messageText - ? `${messageText}\n\n${attachmentLines}` - : attachmentLines; - } + if (!attach) { + const uploadedAttachments = attachments.filter(a => a.uploaded && a.serverUrl); + if (uploadedAttachments.length > 0) { + const attachmentLines = uploadedAttachments.map(a => { + if (a.isImage) { + return `![${a.name}](${a.serverUrl})`; + } + return `[${a.name}](${a.serverUrl})`; + }).join('\n'); + messageText = messageText + ? `${messageText}\n\n${attachmentLines}` + : attachmentLines; + } - log.chat.info(`Sending message to session=${sessionId}`, { length: messageText.length, preview: messageText.slice(0, 80) }); + log.chat.info(`Sending message to session=${sessionId}`, { length: messageText.length, preview: messageText.slice(0, 80) }); - setInput(''); - if (inputRef.current) { - inputRef.current.style.height = 'auto'; - inputRef.current.focus(); - } + setInput(''); + if (inputRef.current) { + inputRef.current.style.height = 'auto'; + inputRef.current.focus(); + } - // Clear attachments - setAttachments(prev => { - prev.forEach(a => { if (a.url) URL.revokeObjectURL(a.url); }); - return []; - }); + // Clear attachments + setAttachments(prev => { + prev.forEach(a => { if (a.url) URL.revokeObjectURL(a.url); }); + return []; + }); - // Optimistic user message - const userMsg: Message = { - id: Date.now(), - session_id: sessionId, - role: 'user', - content: messageText, - created_at: new Date().toISOString(), - rag_sources: null, - }; - addMessage(userMsg); + // Optimistic user message + const userMsg: Message = { + id: Date.now(), + session_id: sessionId, + role: 'user', + content: messageText, + created_at: new Date().toISOString(), + rag_sources: null, + }; + addMessage(userMsg); + } else { + log.chat.info(`Re-attaching to background run for session=${sessionId}`); + } // Start streaming setStreaming(true); @@ -703,7 +722,7 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr let doneHandled = false; streamBufferRef.current = ''; - const controller = api.sendMessageStream(sessionId, messageText, { + const streamCallbacks: api.StreamCallbacks = { onChunk: (event) => { if (isStale()) return; // stop writing after a session switch (#1580) const content = event.content || ''; @@ -1046,7 +1065,9 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr }, 300); // Auto-title on first message - if (session && session.title === 'New Task') { + // Skip client-side auto-title when re-attaching (no user text in + // hand and the run's lifecycle already titles server-side, #1580). + if (!attach && session && session.title === 'New Task') { const autoTitle = text.slice(0, 50) + (text.length > 50 ? '...' : ''); api.updateSession(sessionId, { title: autoTitle }) .then(() => updateSessionInList(sessionId, { title: autoTitle })) @@ -1113,7 +1134,11 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr .then((data) => useChatStore.getState().setAgents(data.agents || [])) .catch(() => { /* non-critical */ }); }, - }, undefined, undefined, activeAgentId); + }; + + const controller = attach + ? api.attachToRun(sessionId, streamCallbacks) + : api.sendMessageStream(sessionId, messageText, streamCallbacks, undefined, undefined, activeAgentId); abortRef.current = controller; }, [input, attachments, isStreaming, sessionId, session, addMessage, setMessages, setStreaming, flushStreamBuffer, clearStreamContent, updateSessionInList, addAgentStep, updateLastAgentStep, appendThinkingContent, updateLastToolStep, clearAgentSteps, activeAgentId, addNotification, isStale]); @@ -1121,6 +1146,39 @@ export function ChatView({ sessionId, onCreateAgent, onAgentChange }: ChatViewPr // Keep ref in sync so event listeners always call the latest sendMessage sendMessageRef.current = sendMessage; + // Re-attach to an in-flight background run on mount (#1580). When the + // user revisits a session whose turn is still running server-side, hook + // back into its live stream so progress resumes in the view instead of + // sitting static until the run finishes. Once per session mount; the + // attach path no-ops if a stream is already live here. + const reattachedRef = useRef(false); + useEffect(() => { + reattachedRef.current = false; + }, [sessionId]); + useEffect(() => { + const attemptAttach = () => { + if (reattachedRef.current) return; + if (useChatStore.getState().isStreaming) return; + reattachedRef.current = true; + log.chat.info(`Resuming live view of background run for session=${sessionId}`); + sendMessageRef.current(undefined, { attach: true }); + }; + // Fast path: the global poll already knows this session is running. + if (useChatStore.getState().runningSessionIds.includes(sessionId)) { + attemptAttach(); + return; + } + // Otherwise confirm once with the backend on mount. + let cancelled = false; + api.getActiveRuns() + .then(({ session_ids }) => { + if (cancelled) return; + if (session_ids.includes(sessionId)) attemptAttach(); + }) + .catch(() => { /* non-critical — sidebar spinner still signals running */ }); + return () => { cancelled = true; }; + }, [sessionId]); + // Listen for programmatic message dispatches from rich-content // components (currently the EmailPreScanCard's Approve / Reply // buttons). Wired as a window-level CustomEvent rather than prop diff --git a/src/gaia/apps/webui/src/components/Sidebar.css b/src/gaia/apps/webui/src/components/Sidebar.css index 66fc8e7e1..5793b431d 100644 --- a/src/gaia/apps/webui/src/components/Sidebar.css +++ b/src/gaia/apps/webui/src/components/Sidebar.css @@ -335,6 +335,16 @@ vertical-align: middle; } +/* Spinner shown inline before the title while a session's agent is still + running in the background (#1580). Uses the global `spin` keyframe. */ +.session-running-spinner { + flex-shrink: 0; + margin-right: 4px; + color: var(--accent, var(--text-muted)); + vertical-align: middle; + animation: spin 1s linear infinite; +} + /* Session hash link -- short permalink for troubleshooting */ .session-hash { font-size: 9px; diff --git a/src/gaia/apps/webui/src/components/Sidebar.tsx b/src/gaia/apps/webui/src/components/Sidebar.tsx index a62660a4f..52527ea00 100644 --- a/src/gaia/apps/webui/src/components/Sidebar.tsx +++ b/src/gaia/apps/webui/src/components/Sidebar.tsx @@ -2,7 +2,7 @@ // SPDX-License-Identifier: MIT import { useState, useCallback, useRef, useEffect, useMemo } from 'react'; -import { Plus, Search, Settings, Sun, Moon, Trash2, PanelLeftClose, PanelLeftOpen, Smartphone, Brain, EyeOff, Clock } from 'lucide-react'; +import { Plus, Search, Settings, Sun, Moon, Trash2, PanelLeftClose, PanelLeftOpen, Smartphone, Brain, EyeOff, Clock, Loader2 } from 'lucide-react'; import { useChatStore } from '../stores/chatStore'; import * as api from '../services/api'; import { log } from '../utils/logger'; @@ -34,9 +34,10 @@ interface SidebarProps { } /** Extracted session row to share between grouped and flat rendering. */ -function SessionItem({ session: s, isActive, isPendingDelete, isDeleting, onSelect, onKeyDown, onDelete, formatTime }: { +function SessionItem({ session: s, isActive, isRunning, isPendingDelete, isDeleting, onSelect, onKeyDown, onDelete, formatTime }: { session: Session; isActive: boolean; + isRunning: boolean; isPendingDelete: boolean; isDeleting: boolean; onSelect: (id: string) => void; @@ -64,6 +65,13 @@ function SessionItem({ session: s, isActive, isPendingDelete, isDeleting, onSele > {s.private && } + {isRunning && ( + + )} {s.title} { - log.stream.info(`SSE connection opened -> HTTP ${res.status}`); - - if (!res.ok) { - const errText = await res.text().catch(() => ''); - log.stream.error(`SSE connection failed: HTTP ${res.status}`, errText); - callbacks.onError(new Error(`HTTP ${res.status}: ${errText}`)); - return; + .then((res) => consumeSSEResponse(res, callbacks, t)) + .catch((err) => { + if (err.name === 'AbortError') { + log.stream.warn('Stream aborted by user'); + } else { + log.stream.error('Stream fetch error', err); + callbacks.onError(err); } + }); - const reader = res.body?.getReader(); - if (!reader) { - log.stream.error('No response body reader available'); - callbacks.onError(new Error('No response body')); - return; - } + return controller; +} - const decoder = new TextDecoder(); - let buffer = ''; - let doneReceived = false; +/** + * Read and dispatch an SSE response body against a set of StreamCallbacks. + * Shared by ``sendMessageStream`` (POST /chat/send) and ``attachToRun`` + * (GET /chat/attach) so both parse events identically. + */ +async function consumeSSEResponse( + res: Response, + callbacks: StreamCallbacks, + t: ReturnType, +): Promise { + log.stream.info(`SSE connection opened -> HTTP ${res.status}`); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) { - log.stream.debug('SSE reader done (stream ended)'); - break; - } + if (!res.ok) { + const errText = await res.text().catch(() => ''); + log.stream.error(`SSE connection failed: HTTP ${res.status}`, errText); + callbacks.onError(new Error(`HTTP ${res.status}: ${errText}`)); + return; + } + + const reader = res.body?.getReader(); + if (!reader) { + log.stream.error('No response body reader available'); + callbacks.onError(new Error('No response body')); + return; + } + + const decoder = new TextDecoder(); + let buffer = ''; + let doneReceived = false; + let chunkCount = 0; + let totalChars = 0; + let agentEventCount = 0; - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - - for (const line of lines) { - if (line.startsWith('data: ')) { - const raw = line.slice(6).trim(); - if (!raw) continue; - try { - const event: StreamEvent = JSON.parse(raw); - - if (event.type === 'chunk') { - chunkCount++; - totalChars += (event.content || '').length; - if (chunkCount <= 3 || chunkCount % 50 === 0) { - log.stream.debug(`Chunk #${chunkCount} (+${(event.content || '').length} chars)`); - } - callbacks.onChunk(event); - } else if (event.type === 'answer') { - // Agent final answer - treat as content - callbacks.onChunk(event); - } else if (event.type === 'done') { - doneReceived = true; - log.stream.timed(`Stream complete: ${chunkCount} chunks, ${totalChars} chars, ${agentEventCount} agent events`, t); - callbacks.onDone(event); - } else if (event.type === 'error') { - log.stream.error(`Stream error event:`, event.content); - callbacks.onError(new Error(event.content || 'Unknown error')); - } else if (event.type === 'agent_created') { - log.stream.info(`Agent created: ${event.agent_id}`); - callbacks.onAgentCreated?.(event); - } else if (AGENT_EVENT_TYPES.has(event.type)) { - agentEventCount++; - log.stream.debug(`Agent event: ${event.type}`, event); - callbacks.onAgentEvent(event); - } else { - log.stream.warn(`Unknown SSE event type: ${event.type}`, event); - } - } catch (parseErr) { - log.stream.warn(`Malformed SSE data, skipping`, { raw: raw.slice(0, 100) }); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + log.stream.debug('SSE reader done (stream ended)'); + break; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.startsWith('data: ')) { + const raw = line.slice(6).trim(); + if (!raw) continue; + try { + const event: StreamEvent = JSON.parse(raw); + + if (event.type === 'chunk') { + chunkCount++; + totalChars += (event.content || '').length; + if (chunkCount <= 3 || chunkCount % 50 === 0) { + log.stream.debug(`Chunk #${chunkCount} (+${(event.content || '').length} chars)`); } + callbacks.onChunk(event); + } else if (event.type === 'answer') { + // Agent final answer - treat as content + callbacks.onChunk(event); + } else if (event.type === 'done') { + doneReceived = true; + log.stream.timed(`Stream complete: ${chunkCount} chunks, ${totalChars} chars, ${agentEventCount} agent events`, t); + callbacks.onDone(event); + } else if (event.type === 'error') { + log.stream.error(`Stream error event:`, event.content); + callbacks.onError(new Error(event.content || 'Unknown error')); + } else if (event.type === 'agent_created') { + log.stream.info(`Agent created: ${event.agent_id}`); + callbacks.onAgentCreated?.(event); + } else if (AGENT_EVENT_TYPES.has(event.type)) { + agentEventCount++; + log.stream.debug(`Agent event: ${event.type}`, event); + callbacks.onAgentEvent(event); + } else { + log.stream.warn(`Unknown SSE event type: ${event.type}`, event); } + } catch (parseErr) { + log.stream.warn(`Malformed SSE data, skipping`, { raw: raw.slice(0, 100) }); } } - } finally { - // Release the reader to free the underlying connection - reader.releaseLock(); } + } + } finally { + // Release the reader to free the underlying connection + reader.releaseLock(); + } - // Only signal completion if no explicit done event was received during the stream - if (!doneReceived) { - log.stream.timed(`SSE connection closed without done event: ${chunkCount} chunks, ${agentEventCount} agent events`, t); - callbacks.onDone({ type: 'done' }); - } - }) + // Only signal completion if no explicit done event was received during the stream + if (!doneReceived) { + log.stream.timed(`SSE connection closed without done event: ${chunkCount} chunks, ${agentEventCount} agent events`, t); + callbacks.onDone({ type: 'done' }); + } +} + +/** + * Re-attach to an in-flight background run and stream its events (#1580). + * + * Used when the user revisits a session whose turn is still running. The + * backend replays everything emitted so far, then streams live events. + * Returns an AbortController so the caller can detach on unmount — that + * does NOT cancel the run (it keeps going server-side), it only closes + * this client's view of it. + */ +export function attachToRun( + sessionId: string, + callbacks: StreamCallbacks, +): AbortController { + const controller = new AbortController(); + const t = log.stream.time(); + + log.stream.info(`Attaching to background run for session=${sessionId}`); + + fetch(`${API_BASE}/chat/attach?session_id=${encodeURIComponent(sessionId)}`, { + method: 'GET', + signal: controller.signal, + }) + .then((res) => consumeSSEResponse(res, callbacks, t)) .catch((err) => { if (err.name === 'AbortError') { - log.stream.warn(`Stream aborted by user after ${chunkCount} chunks`); + log.stream.warn('Run attach detached by client'); } else { - log.stream.error(`Stream fetch error`, err); + log.stream.error('Run attach fetch error', err); callbacks.onError(err); } }); @@ -572,6 +618,11 @@ export function sendMessageStream( return controller; } +/** List session ids that currently have a running chat turn (#1580). */ +export async function getActiveRuns(): Promise<{ session_ids: string[] }> { + return apiFetch('GET', '/chat/active'); +} + // -- Tool Confirmation --------------------------------------------------------- /** Resolve a pending tool execution confirmation (Allow or Deny). */ diff --git a/src/gaia/apps/webui/src/stores/__tests__/chatStore.test.ts b/src/gaia/apps/webui/src/stores/__tests__/chatStore.test.ts index 284b675a5..5325fa2f1 100644 --- a/src/gaia/apps/webui/src/stores/__tests__/chatStore.test.ts +++ b/src/gaia/apps/webui/src/stores/__tests__/chatStore.test.ts @@ -52,3 +52,41 @@ describe('chatStore streaming state', () => { expect(after.agentSteps).toEqual([]); }); }); + +describe('chatStore running-sessions registry (#1580)', () => { + beforeEach(() => { + useChatStore.setState({ runningSessionIds: [] }); + }); + + it('setRunningSessions stores the polled active set', () => { + useChatStore.getState().setRunningSessions(['a', 'b']); + expect(useChatStore.getState().runningSessionIds).toEqual(['a', 'b']); + }); + + it('setRunningSessions keeps a stable reference when the set is unchanged', () => { + useChatStore.getState().setRunningSessions(['a', 'b']); + const first = useChatStore.getState().runningSessionIds; + + // Same membership (order-insensitive) must not produce a new array, so + // the 2.6s poll doesn't re-render the sidebar every tick. + useChatStore.getState().setRunningSessions(['b', 'a']); + expect(useChatStore.getState().runningSessionIds).toBe(first); + }); + + it('setRunningSessions replaces the array when membership changes', () => { + useChatStore.getState().setRunningSessions(['a']); + const first = useChatStore.getState().runningSessionIds; + + useChatStore.getState().setRunningSessions(['a', 'c']); + const second = useChatStore.getState().runningSessionIds; + + expect(second).not.toBe(first); + expect(second).toEqual(['a', 'c']); + }); + + it('clears to empty when no runs are active', () => { + useChatStore.getState().setRunningSessions(['a']); + useChatStore.getState().setRunningSessions([]); + expect(useChatStore.getState().runningSessionIds).toEqual([]); + }); +}); diff --git a/src/gaia/apps/webui/src/stores/chatStore.ts b/src/gaia/apps/webui/src/stores/chatStore.ts index 80be1209f..ac304de89 100644 --- a/src/gaia/apps/webui/src/stores/chatStore.ts +++ b/src/gaia/apps/webui/src/stores/chatStore.ts @@ -35,6 +35,12 @@ interface ChatState { updateSessionInList: (id: string, updates: Partial) => void; addPendingDelete: (id: string) => void; removePendingDelete: (id: string) => void; + /** Session IDs with a chat turn still running server-side (#1580). + * Backend-truth (polled from /api/chat/active), so it survives + * navigating away, refresh, and revisit — drives the sidebar's + * "still running" spinner on backgrounded sessions. */ + runningSessionIds: string[]; + setRunningSessions: (ids: string[]) => void; // Messages (for current session) messages: Message[]; @@ -143,6 +149,17 @@ export const useChatStore = create((set, get) => ({ sessions: [], currentSessionId: null, pendingDeleteIds: [], + runningSessionIds: [], + setRunningSessions: (ids) => + set((state) => { + // Reference-stable update: skip the set() when the running set is + // unchanged so the polled refresh doesn't re-render the sidebar. + const prev = state.runningSessionIds; + if (prev.length === ids.length && prev.every((id) => ids.includes(id))) { + return state; + } + return { runningSessionIds: ids }; + }), setSessions: (sessions) => set((state) => ({ // Filter out any sessions that are pending backend deletion so poll diff --git a/src/gaia/ui/_chat_helpers.py b/src/gaia/ui/_chat_helpers.py index 77a4585d3..32a6e7bc5 100644 --- a/src/gaia/ui/_chat_helpers.py +++ b/src/gaia/ui/_chat_helpers.py @@ -1413,12 +1413,20 @@ def _do_chat(): # ── Streaming Chat ─────────────────────────────────────────────────────────── -async def _stream_chat_response(db: ChatDatabase, session: dict, request: ChatRequest): - """Stream chat response as Server-Sent Events. +async def _stream_chat_impl(run, db: ChatDatabase, session: dict, request: ChatRequest): + """Produce chat-response SSE events for a single run. Uses ChatAgent with SSEOutputHandler to emit agent activity events (steps, tool calls, thinking) alongside text chunks, giving the frontend visibility into what the agent is doing. + + This is the run *producer*: it is driven by ``_run_chat_lifecycle`` + inside a detached task that owns the run for its full duration, + independent of any HTTP/SSE client connection. Yielded ``data: ...`` + strings are buffered and fanned out to attached subscribers by the + lifecycle; DB persistence happens here regardless of whether a client + is still listening, so navigating away never loses the run (issue + #1580 follow-up). """ import queue @@ -1450,6 +1458,9 @@ def _cleanup_stream(): try: # Create SSE handler for streaming events sse_handler = SSEOutputHandler() + # Expose the handler on the run so an external Stop can signal the + # producer to bail even after every client has detached (#1580). + run.handler = sse_handler # Register so /api/chat/confirm-tool can find this handler. _active_sse_handlers[session_id] = sse_handler @@ -2390,6 +2401,84 @@ def _persist_policy_block_if_needed(): _cleanup_stream() +async def _run_chat_lifecycle( + run, db: ChatDatabase, session: dict, request: ChatRequest +): + """Drive a single chat run to completion, independent of any client. + + Runs inside a detached task owned by ``RunManager``. Pumps every SSE + event from ``_stream_chat_impl`` into the run's replay buffer / live + subscribers via ``run.emit``. Because this task is not the HTTP + response, a client disconnect cannot cancel it — the producer finishes + and persists server-side (#1580 follow-up). + """ + async for data in _stream_chat_impl(run, db, session, request): + run.emit(data) + # Notify the AgentLoop that a user turn completed (best-effort; no-op if + # the autonomy loop isn't running). Fires on real run completion rather + # than on subscriber detach. + try: + from gaia.ui.agent_loop import agent_loop + + agent_loop.notify_user_message(request.session_id) + except Exception: # pylint: disable=broad-except + pass + + +async def _stream_chat_response(db: ChatDatabase, session: dict, request: ChatRequest): + """Stream a chat turn as SSE for the ``/api/chat/send`` client. + + Starts the run's detached lifecycle (the chat router guarantees no run + is already active for this session — it returns 409 otherwise) and + subscribes this HTTP connection to it. Yields the replay buffer (empty + for a fresh run) followed by live events until the run completes or the + client disconnects. Disconnecting only detaches this subscriber; the + run keeps going in the background. + """ + from gaia.ui.run_manager import DONE, run_manager + + session_id = request.session_id + run = run_manager.get(session_id) + if run is None: + run = run_manager.start( + session_id, + lambda r: _run_chat_lifecycle(r, db, session, request), + ) + q = run.subscribe() + try: + while True: + item = await q.get() + if item is DONE: + break + yield item + finally: + run.unsubscribe(q) + + +async def _attach_chat_stream(session_id: str): + """Re-attach an SSE client to an already-running background run (#1580). + + Used by ``GET /api/chat/attach`` when the user revisits a session whose + turn is still in flight. Replays everything emitted so far, then streams + live events to completion. The caller is responsible for returning 404 + when no run is active. + """ + from gaia.ui.run_manager import DONE, run_manager + + run = run_manager.get(session_id) + if run is None: + return + q = run.subscribe() + try: + while True: + item = await q.get() + if item is DONE: + break + yield item + finally: + run.unsubscribe(q) + + # ── Document Indexing ──────────────────────────────────────────────────────── diff --git a/src/gaia/ui/routers/chat.py b/src/gaia/ui/routers/chat.py index c034d09c2..e6007d54c 100644 --- a/src/gaia/ui/routers/chat.py +++ b/src/gaia/ui/routers/chat.py @@ -22,6 +22,7 @@ from ..database import ChatDatabase from ..dependencies import get_db from ..models import ChatRequest, ChatResponse +from ..run_manager import run_manager from ..sse_handler import ( _RAG_RESULT_JSON_SUB_RE, _THOUGHT_JSON_SUB_RE, @@ -89,7 +90,14 @@ async def send_message( # Reject overlapping turns for the same session. Force-releasing an # asyncio.Lock held by another coroutine is unsafe because the lock # has no ownership tracking. - if session_lock.locked(): + # + # The lock guards the synchronous request window; ``run_manager`` guards + # the *background tail* — a streaming run keeps going (and persisting) + # after the client disconnects and the HTTP lock is released (#1580), so + # a new turn for the same session must also be rejected while that + # background run is still active, or it would corrupt the cached agent's + # conversation state. + if session_lock.locked() or run_manager.is_running(sid): raise HTTPException( status_code=409, detail="A chat request is already in progress for this session. " @@ -136,10 +144,12 @@ async def _release_stream_resources(): async def _stream(): db.add_message(request.session_id, "user", request.message) + # The run's detached lifecycle owns producer + persistence and + # fires the AgentLoop notify on real completion; this subscriber + # just relays buffered + live events to the browser. Detaching + # (client disconnect) no longer cancels the run (#1580). async for chunk in srv._stream_chat_response(db, session, request): yield chunk - # Notify AgentLoop after the streaming response completes - _notify_loop(request.session_id) sem_released = True return StreamingResponse( @@ -253,3 +263,42 @@ async def cancel_stream(request: CancelStreamRequest): ) handler.cancelled.set() return {"status": "ok", "cancelled": True} + + +@router.get("/api/chat/active") +async def list_active_runs(): + """Return the session ids with a currently-running chat turn. + + The Agent UI polls this to render a "still running" indicator on + backgrounded sessions in the sidebar — runs continue server-side after + the user navigates away, so this is the source of truth independent of + any open SSE connection (#1580 follow-up). + """ + return {"session_ids": run_manager.active_sessions()} + + +@router.get("/api/chat/attach") +async def attach_stream(session_id: str): + """Re-attach to an in-flight background run and stream its events (SSE). + + Used when the user revisits a session whose turn is still running. The + response replays every event emitted so far, then streams live events + to completion. No session lock is taken — the originating ``/send`` run + already owns the turn; this is a read-only subscriber. + """ + if not run_manager.is_running(session_id): + raise HTTPException( + status_code=404, + detail="No active run for this session.", + ) + + srv = _server_mod() + return StreamingResponse( + srv._attach_chat_stream(session_id), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/gaia/ui/routers/sessions.py b/src/gaia/ui/routers/sessions.py index fa6b436d6..2c560fd6d 100644 --- a/src/gaia/ui/routers/sessions.py +++ b/src/gaia/ui/routers/sessions.py @@ -138,6 +138,12 @@ async def delete_session( """Delete a session and its messages.""" if not db.delete_session(session_id): raise HTTPException(status_code=404, detail="Session not found") + # Cancel any background run for this session before tearing it down — + # runs now outlive the SSE connection (#1580), so a run left going would + # try to persist its answer to a session that no longer exists. + from ..run_manager import run_manager + + run_manager.cancel(session_id) # Remove the per-session lock to prevent memory leaks http_request.app.state.session_locks.pop(session_id, None) # Evict the cached ChatAgent for this session so a fresh one is created diff --git a/src/gaia/ui/run_manager.py b/src/gaia/ui/run_manager.py new file mode 100644 index 000000000..eab8b39b5 --- /dev/null +++ b/src/gaia/ui/run_manager.py @@ -0,0 +1,185 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT + +"""Per-session background chat-run registry for the GAIA Agent UI. + +A chat turn used to live and die with the SSE HTTP connection: when the +user clicked **New Task** (or switched sessions) mid-stream, the browser +aborted the SSE, the server cancelled the streaming generator, and the +in-flight answer was discarded with no record it had ever been running +(issue #1580 follow-up). + +``RunManager`` makes each turn a first-class ``Run`` that owns its own +lifecycle independent of any HTTP connection: + +* The agent producer + response persistence run inside a detached + ``asyncio`` task, so a client disconnect no longer cancels or loses the + run — it finishes server-side and persists to the DB. +* Every SSE event the run emits is appended to a replay ``buffer`` and + fanned out to any number of live subscriber queues. A browser can + *attach* to an in-flight run (on revisit) and receive the full history + followed by live events. +* ``active_sessions()`` is the source of truth the sidebar polls to show + which sessions are still running. + +Threading note: ``Run.emit`` / ``subscribe`` / ``finish`` only mutate the +buffer and subscriber list, and are only ever called from coroutines on +the server event loop (the lifecycle task and the subscriber generators). +They never ``await`` between reading and writing that state, so no extra +locking is needed — the single-threaded event loop guarantees atomicity. +The agent itself runs in a separate producer thread that communicates via +the handler's thread-safe ``queue.Queue``; that boundary is unchanged. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Awaitable, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Sentinel pushed to a subscriber queue when the run is complete so the +# subscriber generator knows to stop awaiting and close cleanly. +DONE = object() + + +class Run: + """A single in-flight (or just-finished) chat turn for one session.""" + + def __init__(self, session_id: str): + self.session_id = session_id + # The run's SSEOutputHandler, set by the lifecycle once constructed. + # Held so an external cancel can signal the producer to bail. + self.handler = None + # Append-only log of every SSE ``data: ...`` string emitted so far. + # New subscribers replay this before receiving live events. + self.buffer: List[str] = [] + # Live subscriber queues. emit() fans out to each; finish() closes them. + self.subscribers: List[asyncio.Queue] = [] + self.done = asyncio.Event() + self.task: Optional[asyncio.Task] = None + + # ── Producer side (called from the lifecycle task) ──────────────── + def emit(self, data: str) -> None: + """Record an SSE event and fan it out to all live subscribers.""" + self.buffer.append(data) + for q in self.subscribers: + q.put_nowait(data) + + def finish(self) -> None: + """Mark the run complete and close out every live subscriber.""" + self.done.set() + for q in self.subscribers: + q.put_nowait(DONE) + self.subscribers.clear() + + # ── Consumer side (called from subscriber generators) ───────────── + def subscribe(self) -> asyncio.Queue: + """Register a subscriber. + + Returns a queue preloaded with the full replay buffer. If the run + has already finished, a ``DONE`` sentinel is appended so the caller + drains the history then stops; otherwise the queue stays registered + to receive live events. + """ + q: asyncio.Queue = asyncio.Queue() + for item in self.buffer: + q.put_nowait(item) + if self.done.is_set(): + q.put_nowait(DONE) + else: + self.subscribers.append(q) + return q + + def unsubscribe(self, q: asyncio.Queue) -> None: + """Drop a subscriber (e.g. its client disconnected). + + The run keeps going regardless — this only stops fanning events to + the departed client. + """ + try: + self.subscribers.remove(q) + except ValueError: + pass + + +class RunManager: + """Registry of active chat runs, keyed by session id.""" + + def __init__(self): + self._runs: Dict[str, Run] = {} + + def is_running(self, session_id: str) -> bool: + return session_id in self._runs + + def get(self, session_id: str) -> Optional[Run]: + return self._runs.get(session_id) + + def active_sessions(self) -> List[str]: + """Session ids with a currently-running turn (sidebar source of truth).""" + return list(self._runs.keys()) + + def start( + self, + session_id: str, + make_lifecycle: Callable[[Run], Awaitable[None]], + ) -> Run: + """Create a run and launch its detached lifecycle task. + + ``make_lifecycle`` receives the ``Run`` and returns the coroutine + that drives the producer and emits events via ``run.emit``. The + coroutine runs to completion regardless of whether any client is + still attached. + + Raises: + RuntimeError: if a run is already active for this session. The + caller (chat router) must guard with ``is_running`` first and + return 409 — overlapping turns on one session would corrupt + the cached agent's conversation state. + """ + if session_id in self._runs: + raise RuntimeError( + f"A run is already active for session {session_id[:8]}; " + "refusing to start an overlapping turn." + ) + run = Run(session_id) + self._runs[session_id] = run + + async def _drive() -> None: + try: + await make_lifecycle(run) + except asyncio.CancelledError: + logger.info("Run cancelled for session %s", session_id[:8]) + raise + except Exception: # pylint: disable=broad-except + # The lifecycle owns its own user-facing error emission; this + # is the backstop so a bug there can't wedge the registry. + logger.exception( + "Unhandled error in run lifecycle for session %s", + session_id[:8], + ) + finally: + run.finish() + self._runs.pop(session_id, None) + + run.task = asyncio.create_task(_drive()) + return run + + def cancel(self, session_id: str) -> bool: + """Request cancellation of an active run (explicit Stop button). + + Sets the handler's cancelled flag; the producer observes it at its + next step boundary and tears down. Returns False if no run is + active for the session. + """ + run = self._runs.get(session_id) + if run is None: + return False + if run.handler is not None: + run.handler.cancelled.set() + return True + + +# Process-wide singleton used by the chat router and chat helpers. +run_manager = RunManager() diff --git a/src/gaia/ui/server.py b/src/gaia/ui/server.py index 5c211f5d9..227e54620 100644 --- a/src/gaia/ui/server.py +++ b/src/gaia/ui/server.py @@ -38,6 +38,7 @@ # expose these names at module level. The canonical implementations live # in ``_chat_helpers`` (shared by both server.py and the router modules). # pylint: disable=unused-import +from ._chat_helpers import _attach_chat_stream # noqa: F401 from ._chat_helpers import _build_history_pairs # noqa: F401 from ._chat_helpers import _compute_allowed_paths # noqa: F401 from ._chat_helpers import _get_chat_response # noqa: F401 diff --git a/src/gaia/version.py b/src/gaia/version.py index aae339efd..10cbe074c 100644 --- a/src/gaia/version.py +++ b/src/gaia/version.py @@ -6,7 +6,7 @@ import subprocess from importlib.metadata import version as get_package_version_metadata -__version__ = "0.21.0" +__version__ = "0.21.1" # Lemonade version used across CI and installer LEMONADE_VERSION = "10.2.0" diff --git a/tests/unit/chat/ui/test_chat_active_attach.py b/tests/unit/chat/ui/test_chat_active_attach.py new file mode 100644 index 000000000..692bbc492 --- /dev/null +++ b/tests/unit/chat/ui/test_chat_active_attach.py @@ -0,0 +1,110 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT + +"""Endpoint tests for the background-run surface (#1580 follow-up). + +Covers ``GET /api/chat/active`` (sidebar running-indicator source) and +``GET /api/chat/attach`` (revisit reconnect), plus the new overlap guard +that rejects a second turn while a run is still active in the registry. +""" + +from unittest.mock import patch + +import pytest +from fastapi.testclient import TestClient + +from gaia.ui.run_manager import run_manager +from gaia.ui.server import create_app + + +@pytest.fixture +def app(): + return create_app(db_path=":memory:") + + +@pytest.fixture +def client(app): + return TestClient(app, raise_server_exceptions=False) + + +@pytest.fixture(autouse=True) +def _clean_registry(): + """Each test starts and ends with an empty run registry.""" + run_manager._runs.clear() + yield + run_manager._runs.clear() + + +@pytest.fixture +def session_id(client): + return client.post("/api/sessions", json={}).json()["id"] + + +class _FakeRun: + """Stand-in registry entry — enough for is_running / active_sessions.""" + + def __init__(self, session_id): + self.session_id = session_id + self.handler = None + + +def test_active_empty_by_default(client): + resp = client.get("/api/chat/active") + assert resp.status_code == 200 + assert resp.json() == {"session_ids": []} + + +def test_active_reports_running_sessions(client): + run_manager._runs["s1"] = _FakeRun("s1") + run_manager._runs["s2"] = _FakeRun("s2") + + resp = client.get("/api/chat/active") + assert resp.status_code == 200 + assert set(resp.json()["session_ids"]) == {"s1", "s2"} + + +def test_attach_404_when_not_running(client): + resp = client.get("/api/chat/attach", params={"session_id": "nope"}) + assert resp.status_code == 404 + + +def test_attach_streams_existing_run(client): + """Attach replays a run's buffered events then closes on DONE.""" + run_manager._runs["live"] = _FakeRun("live") + + async def fake_attach(session_id): + assert session_id == "live" + yield 'data: {"type": "chunk", "content": "hi"}\n\n' + yield 'data: {"type": "done", "content": "hi"}\n\n' + + with patch("gaia.ui.server._attach_chat_stream", fake_attach): + resp = client.get("/api/chat/attach", params={"session_id": "live"}) + assert resp.status_code == 200 + body = resp.text + assert '"type": "chunk"' in body + assert '"type": "done"' in body + + +def test_send_409_when_run_already_active(client, session_id): + """A new turn is rejected while a background run is still registered.""" + run_manager._runs[session_id] = _FakeRun(session_id) + + resp = client.post( + "/api/chat/send", + json={"session_id": session_id, "message": "hi", "stream": False}, + ) + assert resp.status_code == 409 + assert "already in progress" in resp.json()["detail"] + + +def test_delete_session_cancels_active_run(client, session_id): + """Deleting a session cancels its run so it can't persist to a dead session.""" + import threading + + fake = _FakeRun(session_id) + fake.handler = type("H", (), {"cancelled": threading.Event()})() + run_manager._runs[session_id] = fake + + resp = client.delete(f"/api/sessions/{session_id}") + assert resp.status_code == 200 + assert fake.handler.cancelled.is_set() diff --git a/tests/unit/chat/ui/test_run_manager.py b/tests/unit/chat/ui/test_run_manager.py new file mode 100644 index 000000000..720e2acc2 --- /dev/null +++ b/tests/unit/chat/ui/test_run_manager.py @@ -0,0 +1,179 @@ +# Copyright(C) 2025-2026 Advanced Micro Devices, Inc. All rights reserved. +# SPDX-License-Identifier: MIT + +"""Unit tests for the per-session background chat-run registry (#1580). + +These exercise ``RunManager``/``Run`` in isolation — no agent, no LLM, no +HTTP. The lifecycle coroutine is a stub that emits a few SSE strings, so we +can assert the registry's contract: runs outlive subscriber disconnects, +late subscribers get a full replay, completion closes subscribers and +deregisters the run, and overlapping starts are rejected. +""" + +import asyncio + +import pytest + +from gaia.ui.run_manager import DONE, Run, RunManager + + +class _FakeHandler: + """Minimal stand-in for SSEOutputHandler — only ``cancelled`` is used.""" + + def __init__(self): + import threading + + self.cancelled = threading.Event() + + +async def _drain(q: asyncio.Queue) -> list: + """Drain a subscriber queue up to (and excluding) the DONE sentinel.""" + out = [] + while True: + item = await q.get() + if item is DONE: + break + out.append(item) + return out + + +async def test_run_completes_and_deregisters(): + mgr = RunManager() + + async def lifecycle(run: Run): + run.handler = _FakeHandler() + run.emit("data: a\n\n") + run.emit("data: b\n\n") + + run = mgr.start("sess-1", lifecycle) + assert mgr.is_running("sess-1") + assert "sess-1" in mgr.active_sessions() + + q = run.subscribe() + events = await _drain(q) + + assert events == ["data: a\n\n", "data: b\n\n"] + await run.task # ensure the lifecycle's finally has run + assert not mgr.is_running("sess-1") + assert run.done.is_set() + + +async def test_run_survives_subscriber_disconnect(): + """A run keeps emitting + completing after its only subscriber detaches.""" + mgr = RunManager() + gate = asyncio.Event() + + async def lifecycle(run: Run): + run.emit("data: first\n\n") + await gate.wait() # hold until the test detaches the subscriber + run.emit("data: after-disconnect\n\n") + + run = mgr.start("sess-2", lifecycle) + q = run.subscribe() + assert await q.get() == "data: first\n\n" + + # Simulate the browser navigating away: drop the subscriber mid-run. + run.unsubscribe(q) + assert run.subscribers == [] + + # The run must continue and finish regardless. + gate.set() + await run.task + assert run.done.is_set() + assert not mgr.is_running("sess-2") + # The post-disconnect event still landed in the replay buffer. + assert "data: after-disconnect\n\n" in run.buffer + + +async def test_late_subscriber_gets_full_replay(): + """Attaching after events were emitted replays history then live events.""" + mgr = RunManager() + emitted_two = asyncio.Event() + release = asyncio.Event() + + async def lifecycle(run: Run): + run.emit("data: 1\n\n") + run.emit("data: 2\n\n") + emitted_two.set() + await release.wait() + run.emit("data: 3\n\n") + + run = mgr.start("sess-3", lifecycle) + await emitted_two.wait() + + # Attach late — should replay 1 & 2, then receive live 3. + q = run.subscribe() + release.set() + events = await _drain(q) + + assert events == ["data: 1\n\n", "data: 2\n\n", "data: 3\n\n"] + await run.task + + +async def test_subscribe_after_done_terminates_immediately(): + mgr = RunManager() + + async def lifecycle(run: Run): + run.emit("data: only\n\n") + + run = mgr.start("sess-4", lifecycle) + await run.task # run fully finished before anyone subscribes + + q = run.subscribe() + events = await _drain(q) + assert events == ["data: only\n\n"] + # No lingering registration for a finished run. + assert run.subscribers == [] + + +async def test_overlapping_start_raises(): + mgr = RunManager() + release = asyncio.Event() + + async def lifecycle(run: Run): + await release.wait() + + mgr.start("sess-5", lifecycle) + with pytest.raises(RuntimeError): + mgr.start("sess-5", lifecycle) + + release.set() + + +async def test_cancel_sets_handler_flag(): + mgr = RunManager() + started = asyncio.Event() + release = asyncio.Event() + + async def lifecycle(run: Run): + run.handler = _FakeHandler() + started.set() + await release.wait() + + run = mgr.start("sess-6", lifecycle) + await started.wait() + + assert mgr.cancel("sess-6") is True + assert run.handler.cancelled.is_set() + + release.set() + await run.task + + # Cancelling an unknown / finished session is a safe no-op. + assert mgr.cancel("nope") is False + + +async def test_active_sessions_reflects_lifecycle(): + mgr = RunManager() + release = asyncio.Event() + + async def lifecycle(run: Run): + await release.wait() + + mgr.start("a", lifecycle) + mgr.start("b", lifecycle) + assert set(mgr.active_sessions()) == {"a", "b"} + + release.set() + await asyncio.gather(*(r.task for r in [mgr.get("a"), mgr.get("b")] if r)) + assert mgr.active_sessions() == []