Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions crates/buzz-acp/src/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,20 @@ pub struct AcpClient {
observer_agent_index: Option<usize>,
/// Best-effort context attached to raw ACP wire events.
observer_context: ObserverContext,
/// Goose-specific: most recently observed `_meta.goose.activeRunId` from
/// a `session/update` notification of kind `session_info_update`.
/// Most recently observed `_meta.goose.activeRunId` from a
/// `session/update` notification of kind `session_info_update`.
///
/// Goose emits this whenever it starts or clears an active prompt run
/// Both goose and buzz-agent emit `session_info_update` with this field;
/// goose emits it whenever it starts or clears an active prompt run
/// (`crates/goose/src/acp/server.rs:2277` `send_active_run_update`).
/// Required as `expectedRunId` when calling the non-standard
/// `_goose/unstable/session/steer` method to inject a message into an
/// in-flight turn without cancelling it.
///
/// `None` until the first `session_info_update` arrives, or after the
/// run clears (goose emits `activeRunId: null` at end of turn). Other
/// agents will simply never populate this — readers must treat `None`
/// as "no active run to steer into" and fall back to cancel+merge.
/// run clears (goose/buzz-agent emit `activeRunId: null` at end of turn).
/// Other agents may leave this unset — readers must treat `None` as
/// "no active run to steer into" and fall back to cancel+merge.
active_run_id: Option<String>,
/// Per-turn channel for receiving goose-native non-cancelling steer
/// requests from the main loop. Installed by
Expand Down Expand Up @@ -490,8 +491,9 @@ impl AcpClient {
/// Most recently observed goose `_meta.goose.activeRunId` from a
/// `session_info_update`, if any.
///
/// Goose-only: other agents leave this `None` for the lifetime of the
/// client. Read directly by `read_until_response_with_idle_timeout`'s
/// Both goose and buzz-agent emit `session_info_update`; other agents
/// leave this `None` for the lifetime of the client. Read directly by
/// `read_until_response_with_idle_timeout`'s
/// steer arm at write time (see [`crate::pool::SteerRequest`] for
/// why the read loop owns this); production callers do not need this
/// accessor. Kept as `pub` so tests can introspect the field.
Expand Down Expand Up @@ -1266,11 +1268,11 @@ impl AcpClient {
false
}
"session_info_update" => {
// Goose-only as of writing: `_meta.goose.activeRunId` carries
// the id of the currently-active prompt run, or `null` when
// the run has cleared. Other agents don't emit this field;
// for them `active_run_id` stays `None` and steer callers
// will fall back to cancel+merge.
// Both goose and buzz-agent emit `session_info_update` with
// `_meta.goose.activeRunId`: the id of the currently-active
// prompt run, or `null` when the run has cleared. Other agents
// don't emit this field; for them `active_run_id` stays `None`
// and steer callers will fall back to cancel+merge.
//
// Per the ACP `SessionInfoUpdate` schema, `_meta` is a field
// on the update object itself — nested inside `update`, not
Expand Down
14 changes: 14 additions & 0 deletions crates/buzz-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ impl RunCtx<'_> {
);
}

if !response.reasoning.is_empty() {
wire::send(
self.wire,
wire::session_update(
self.session_id,
json!({
"sessionUpdate": "agent_thought_chunk",
"content": { "type": "text", "text": &response.reasoning }
}),
),
)
.await;
}

if !response.text.is_empty() {
wire::send(
self.wire,
Expand Down
49 changes: 48 additions & 1 deletion crates/buzz-agent/src/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ fn databricks_v2_path(route: DatabricksV2Route) -> &'static str {

fn parse_responses(v: Value) -> Result<LlmResponse, AgentError> {
let mut text = String::new();
let mut reasoning = String::new();
let mut tool_calls = Vec::new();
let mut saw_function_call = false;

Expand Down Expand Up @@ -663,7 +664,28 @@ fn parse_responses(v: Value) -> Result<LlmResponse, AgentError> {
args,
)?);
}
// Reasoning items are opaque/internal; we don't replay them.
Some("reasoning") => {
// Reasoning summary items from the Responses API. Each item has a
// `summary` array of `{"type": "summary_text", "text": "..."}` objects.
for s in item
.get("summary")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
if matches!(
s.get("type").and_then(Value::as_str),
Some("summary_text" | "text")
) {
if let Some(t) = s.get("text").and_then(Value::as_str) {
if !reasoning.is_empty() {
reasoning.push('\n');
}
reasoning.push_str(t);
}
}
}
}
// Unknown types ignored for forward-compat.
_ => {}
}
Expand Down Expand Up @@ -691,6 +713,7 @@ fn parse_responses(v: Value) -> Result<LlmResponse, AgentError> {
tool_calls,
stop,
input_tokens,
reasoning,
})
}

Expand Down Expand Up @@ -760,6 +783,7 @@ fn parse_anthropic(v: Value) -> Result<LlmResponse, AgentError> {
let stop = map_stop(v.get("stop_reason").and_then(Value::as_str));
let mut tool_calls = Vec::new();
let mut text = String::new();
let mut reasoning = String::new();
if let Some(blocks) = v.get("content").and_then(Value::as_array) {
for b in blocks {
match b.get("type").and_then(Value::as_str) {
Expand All @@ -768,6 +792,15 @@ fn parse_anthropic(v: Value) -> Result<LlmResponse, AgentError> {
text.push_str(t);
}
}
Some("thinking") => {
// Anthropic extended thinking block: `{"type": "thinking", "thinking": "..."}`
if let Some(t) = b.get("thinking").and_then(Value::as_str) {
if !reasoning.is_empty() {
reasoning.push('\n');
}
reasoning.push_str(t);
}
}
Some("tool_use") => tool_calls.push(make_tool_call(
str_field(b, "id"),
str_field(b, "name"),
Expand All @@ -783,6 +816,7 @@ fn parse_anthropic(v: Value) -> Result<LlmResponse, AgentError> {
tool_calls,
stop,
input_tokens,
reasoning,
})
}

Expand All @@ -797,6 +831,18 @@ fn parse_openai(v: Value) -> Result<LlmResponse, AgentError> {
.get("message")
.ok_or_else(|| AgentError::Llm("missing message".into()))?;
let text = str_field(msg, "content");
// DeepSeek and vLLM-style OpenAI-compat hosts expose reasoning tokens on the
// message object. Prefer `reasoning_content` (DeepSeek's field name); fall
// back to `reasoning` (some other providers). Both are absent for standard
// OpenAI responses, which leaves this empty without any special-casing.
let reasoning = {
let rc = str_field(msg, "reasoning_content");
if rc.is_empty() {
str_field(msg, "reasoning")
} else {
rc
}
};
let mut tool_calls = Vec::new();
if let Some(arr) = msg.get("tool_calls").and_then(Value::as_array) {
for tc in arr {
Expand All @@ -819,6 +865,7 @@ fn parse_openai(v: Value) -> Result<LlmResponse, AgentError> {
tool_calls,
stop,
input_tokens,
reasoning,
})
}

Expand Down
9 changes: 9 additions & 0 deletions crates/buzz-agent/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ pub struct LlmResponse {
/// tokens, so reading it alone would undercount). Used to gate handoff on
/// the real token budget rather than a byte estimate.
pub input_tokens: Option<u64>,
/// Reasoning/thinking content emitted by the model before its answer, if
/// any. Non-empty when the provider returns extended-thinking tokens:
///
/// - Responses API: concatenated `summary[].text` from `type == "reasoning"` output items.
/// - Anthropic: concatenated `thinking` from `type == "thinking"` content blocks.
/// - OpenAI chat/completions: not exposed; always empty.
///
/// Empty string when the provider returned no reasoning content.
pub reasoning: String,
}

#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down
Loading
Loading