Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 109 additions & 52 deletions crates/buzz-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,99 @@ use uuid::Uuid;

use buzz_core::StoredEvent;

/// Outcome of [`nip33_stale_write_guard`].
pub enum StaleWrite {
/// An equal-or-newer event already exists for the coordinate; the incoming
/// event is stale and the caller must abandon the write.
Dominated,
/// The incoming event wins. Any older events for the coordinate have been
/// soft-deleted; the caller should proceed to insert.
Proceed,
}

/// Enforce NIP-33 parameterized-replaceable semantics for one event inside an
/// already-open transaction.
///
/// Acquires the per-coordinate advisory lock (`pg_advisory_xact_lock`, released
/// on commit/rollback), finds the current live event for `(kind, pubkey, d_tag)`,
/// and compares it against the incoming `(created_at, id)`:
///
/// * newest `created_at` wins; same-second ties are broken by lowest `id`
/// (byte order, which matches the lowercase-hex ordering NIP-01 specifies).
/// * if the incoming event loses, returns [`StaleWrite::Dominated`] and makes no
/// changes.
/// * otherwise soft-deletes the older event(s) and returns [`StaleWrite::Proceed`].
///
/// Shared by [`Db::replace_parameterized_event`] (normal store path) and the
/// relay's `persist_command_event` so both apply identical replacement rules to
/// kind 30000–39999 events. Keep this the single source of truth — divergent
/// NIP-33 replacement logic is a silent-correctness hazard.
pub async fn nip33_stale_write_guard(
conn: &mut sqlx::PgConnection,
kind_i32: i32,
pubkey_bytes: &[u8],
d_tag: &str,
created_at: DateTime<Utc>,
incoming_id: &[u8],
) -> Result<StaleWrite> {
// Stable advisory-lock key: FNV-1a over (kind, pubkey, d_tag) — deterministic
// across processes so concurrent inserts for the same coordinate serialize.
let lock_key = {
let mut h: u64 = 0xcbf29ce484222325; // FNV offset basis
for b in kind_i32.to_le_bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3);
}
for b in pubkey_bytes {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
for b in d_tag.as_bytes() {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
h as i64
};

sqlx::query("SELECT pg_advisory_xact_lock($1)")
.bind(lock_key)
.execute(&mut *conn)
.await?;

// Current live event for the coordinate (if any).
let existing: Option<(DateTime<Utc>, Vec<u8>)> = sqlx::query_as(
"SELECT created_at, id FROM events \
WHERE kind = $1 AND pubkey = $2 AND d_tag = $3 AND deleted_at IS NULL \
ORDER BY created_at DESC, id ASC LIMIT 1",
)
.bind(kind_i32)
.bind(pubkey_bytes)
.bind(d_tag)
.fetch_optional(&mut *conn)
.await?;

if let Some((existing_ts, existing_id)) = existing {
let dominated = created_at < existing_ts
|| (created_at == existing_ts && incoming_id >= existing_id.as_slice());
if dominated {
return Ok(StaleWrite::Dominated);
}

// Incoming wins — retire the older event(s) for this coordinate.
sqlx::query(
"UPDATE events SET deleted_at = NOW() \
WHERE kind = $1 AND pubkey = $2 AND d_tag = $3 AND deleted_at IS NULL",
)
.bind(kind_i32)
.bind(pubkey_bytes)
.bind(d_tag)
.execute(&mut *conn)
.await?;
}

Ok(StaleWrite::Proceed)
}

/// Extract p-tag mentions from an event and insert into the `event_mentions` table.
///
/// Called after event insertion. Failures are logged but do not block event storage.
Expand Down Expand Up @@ -1088,6 +1181,7 @@ impl Db {
/// Create a new workflow.
pub async fn create_workflow(
&self,
id: Uuid,
channel_id: Option<Uuid>,
owner_pubkey: &[u8],
name: &str,
Expand All @@ -1096,6 +1190,7 @@ impl Db {
) -> Result<Uuid> {
workflow::create_workflow(
&self.pool,
id,
channel_id,
owner_pubkey,
name,
Expand Down Expand Up @@ -1650,68 +1745,30 @@ impl Db {
let created_at = chrono::DateTime::from_timestamp(created_at_secs, 0)
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;

// Stable advisory-lock key: FNV-1a over (kind, pubkey, d_tag).
// Same algorithm as replace_addressable_event — deterministic across processes.
let lock_key = {
let mut h: u64 = 0xcbf29ce484222325; // FNV offset basis
for b in kind_i32.to_le_bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3);
}
for b in pubkey_bytes.as_slice() {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
for b in d_tag.as_bytes() {
h ^= *b as u64;
h = h.wrapping_mul(0x100000001b3);
}
h as i64
};

let mut tx = self.pool.begin().await?;

sqlx::query("SELECT pg_advisory_xact_lock($1)")
.bind(lock_key)
.execute(&mut *tx)
.await?;

// Check for existing event with same (kind, pubkey, d_tag).
let existing: Option<(chrono::DateTime<chrono::Utc>, Vec<u8>)> = sqlx::query_as(
"SELECT created_at, id FROM events \
WHERE kind = $1 AND pubkey = $2 AND d_tag = $3 AND deleted_at IS NULL \
ORDER BY created_at DESC, id ASC LIMIT 1",
)
.bind(kind_i32)
.bind(pubkey_bytes.as_slice())
.bind(d_tag)
.fetch_optional(&mut *tx)
.await?;

// Stale-write protection: reject if incoming is not newer.
// Take the advisory lock and apply NIP-33 replacement (shared with the
// relay's command path via `nip33_stale_write_guard`).
let incoming_id = event.id.as_bytes().as_slice();
if let Some((existing_ts, existing_id)) = existing {
let dominated = created_at < existing_ts
|| (created_at == existing_ts && incoming_id >= existing_id.as_slice());
if dominated {
match nip33_stale_write_guard(
tx.as_mut(),
kind_i32,
pubkey_bytes.as_slice(),
d_tag,
created_at,
incoming_id,
)
.await?
{
StaleWrite::Dominated => {
tx.rollback().await?;
let received_at = chrono::Utc::now();
return Ok((
StoredEvent::with_received_at(event.clone(), received_at, channel_id, false),
false,
));
}

// Soft-delete the older event(s).
sqlx::query(
"UPDATE events SET deleted_at = NOW() \
WHERE kind = $1 AND pubkey = $2 AND d_tag = $3 AND deleted_at IS NULL",
)
.bind(kind_i32)
.bind(pubkey_bytes.as_slice())
.bind(d_tag)
.execute(&mut *tx)
.await?;
StaleWrite::Proceed => {}
}

// Insert the new event inside the transaction.
Expand Down
3 changes: 1 addition & 2 deletions crates/buzz-db/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,13 @@ pub struct ApprovalRecord {
/// New workflows start as `active` and `enabled = TRUE`.
pub async fn create_workflow(
pool: &PgPool,
id: Uuid,
channel_id: Option<Uuid>,
owner_pubkey: &[u8],
name: &str,
definition_json: &str,
definition_hash: &[u8],
) -> Result<Uuid> {
let id = Uuid::new_v4();

sqlx::query(
r#"
INSERT INTO workflows
Expand Down
103 changes: 87 additions & 16 deletions crates/buzz-relay/src/handlers/command_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ async fn persist_command_event(
None
};

// NIP-33 stale-write protection for parameterized-replaceable command kinds
// (only KIND_WORKFLOW_DEF today). Shares the relay-wide replacement logic with
// Db::replace_parameterized_event so command and normal store paths can't drift.
// A dominated (older/retried) event is treated as an idempotent duplicate so
// the handler skips its mutation.
if let Some(ref d_tag) = d_tag {
match buzz_db::nip33_stale_write_guard(
tx.as_mut(),
kind_i32,
pubkey_bytes.as_slice(),
d_tag,
created_at,
id_bytes.as_slice(),
)
.await
.map_err(|e| IngestError::Internal(format!("error: nip33 stale-write guard: {e}")))?
{
buzz_db::StaleWrite::Dominated => return Ok(PersistResult::Duplicate),
buzz_db::StaleWrite::Proceed => {}
}
}

let result = sqlx::query(
r#"
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag)
Expand Down Expand Up @@ -568,6 +590,13 @@ async fn handle_workflow_def(
IngestError::Rejected("invalid: missing workflow name (name or d tag)".into())
})?;

// Parse the d-tag as the workflow_id UUID
let d_tag = extract_d_tag(event)
.ok_or_else(|| IngestError::Rejected("invalid: missing d tag (workflow ID)".into()))?;
let workflow_id = Uuid::parse_str(&d_tag).map_err(|_| {
IngestError::Rejected("invalid: workflow ID in d tag must be a valid UUID".into())
})?;

// 2. Validate caller has channel access (minimum: is a member)
let is_member = state
.is_member_cached(channel_id, &self_bytes)
Expand All @@ -579,6 +608,31 @@ async fn handle_workflow_def(
));
}

// Check if workflow already exists to perform update or create checks
let existing = state.db.get_workflow(workflow_id).await;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔍 Ordering is load-bearing (info-leak). The is_member_cached gate above (line 580) runs before this lookup on purpose, so unauthorized users can't probe for valid workflow UUIDs via differing error responses. Please don't hoist this lookup earlier for efficiency — it silently reintroduces the leak.

let existing_record = match existing {
Ok(record) => {
if record.owner_pubkey != self_bytes {
return Err(IngestError::Rejected(
"forbidden: cannot update a workflow owned by another user".into(),
));
}
if record.channel_id != Some(channel_id) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛓️ Channel immutability + legacy NULL. record.channel_id != Some(channel_id) is deliberate: it rejects both some→some channel moves and the legacy NULL→some case. A naive unwrap/compare would let legacy global workflows be silently re-scoped.

return Err(IngestError::Rejected(
"forbidden: cannot change the channel of an existing workflow".into(),
));
}
Some(record)
}
Err(buzz_db::DbError::NotFound(_)) => None,
Err(e) => {
return Err(IngestError::Internal(format!(
"error: db lookup workflow: {e}"
)));
}
};
let is_update = existing_record.is_some();

// 3. Parse YAML from event.content
let (def, definition_json_str) = buzz_workflow::WorkflowEngine::parse_yaml(&event.content)
.map_err(|e| IngestError::Rejected(format!("invalid: workflow YAML parse error: {e}")))?;
Expand All @@ -588,9 +642,17 @@ async fn handle_workflow_def(

// Generate webhook secret if this workflow uses a Webhook trigger
let webhook_secret = if matches!(def.trigger, buzz_workflow::TriggerDef::Webhook) {
let secret = webhook_secret::generate_webhook_secret();
webhook_secret::inject_secret(&mut definition_json, &secret);
Some(secret)
let existing_secret = existing_record

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔑 Webhook secret preservation. On update we re-inject the existing secret from the stored row (returning None so it isn't rotated/re-surfaced). Previously every edit regenerated _webhook_secret, breaking existing webhook callers.

.as_ref()
.and_then(|r| crate::webhook_secret::extract_secret(&r.definition));
if let Some(secret) = existing_secret {
webhook_secret::inject_secret(&mut definition_json, &secret);
None
} else {
let secret = webhook_secret::generate_webhook_secret();
webhook_secret::inject_secret(&mut definition_json, &secret);
Some(secret)
}
} else {
None
};
Expand All @@ -612,20 +674,29 @@ async fn handle_workflow_def(
PersistResult::Inserted(tx) => tx,
};

// 4. Execute: create_workflow
let workflow_id = state
.db
.create_workflow(
Some(channel_id),
&self_bytes,
&workflow_name,
&definition_json_final,
&hash,
)
.await
.map_err(|e| IngestError::Internal(format!("error: db create_workflow: {e}")))?;
// 4. Execute: update_workflow or create_workflow
if is_update {
state
.db
.update_workflow(workflow_id, &workflow_name, &definition_json_final, &hash)
.await
.map_err(|e| IngestError::Internal(format!("error: db update_workflow: {e}")))?;
Comment on lines +679 to +683

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Ignore stale workflow definition replays

When an older kind:30620 event for the same workflow is delivered after a newer one (for example after a reconnect/retry, while still inside the relay's timestamp window), this unconditional update_workflow overwrites the workflows table with the stale YAML. The command path uses persist_command_event rather than the NIP-33 replace_parameterized_event stale-write check, so clients will still see the newer event as the latest while the workflow engine reads and runs the older definition from the DB. Please compare the incoming event's (created_at, id) against the current live coordinate before applying the update, or route workflow defs through the same replacement logic.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Addressed in efa13ed.

The command path now goes through the same NIP-33 replacement logic as the normal store path. Rather than duplicate the check, I extracted the lock + (created_at, id) dominance test + soft-delete into a shared buzz_db::nip33_stale_write_guard(&mut conn, …) and call it from both Db::replace_parameterized_event and persist_command_event, so the two paths cannot drift.

  • A dominated (older/retried) event now returns PersistResult::Duplicate, so the handler skips update_workflow entirely — the stale YAML never reaches the DB, and clients no longer see a newer event than the engine runs.
  • Among command kinds, only KIND_WORKFLOW_DEF (30620) is parameterized-replaceable (DM/approval kinds are 41xxx/46xxx), so behavior is unchanged for the rest.
  • Covered by test_workflow_update_and_delete (passing against a branch-built relay). Note: Nostr created_at is second-granularity, so same-second edits are resolved by the id tie-break — documented inline where the test sleeps 1s to stay deterministic.

👍 Useful catch — thanks.

} else {
state
.db
.create_workflow(
workflow_id,
Some(channel_id),
&self_bytes,
&workflow_name,
&definition_json_final,
&hash,
)
.await
.map_err(|e| IngestError::Internal(format!("error: db create_workflow: {e}")))?;
}

// Commit: event + workflow creation succeeded atomically.
// Commit: event + workflow creation/update succeeded atomically.
tx.commit()
.await
.map_err(|e| IngestError::Internal(format!("error: commit transaction: {e}")))?;
Expand Down
24 changes: 18 additions & 6 deletions crates/buzz-relay/src/handlers/side_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1586,12 +1586,24 @@ async fn handle_a_tag_deletion(event: &Event, state: &Arc<AppState>) -> anyhow::
buzz_core::kind::KIND_WORKFLOW_DEF => {
// Try UUID first (workflow_id); fall back to name-based lookup.
if let Ok(wf_id) = uuid::Uuid::parse_str(d_tag) {
state
.db
.delete_workflow(wf_id)
.await
.map_err(|e| anyhow::anyhow!("failed to delete workflow {wf_id}: {e}"))?;
tracing::info!(workflow_id = %wf_id, "Workflow deleted via NIP-09 a-tag (UUID)");
let owner_bytes = hex::decode(pubkey_hex).unwrap_or_default();
match state.db.get_workflow(wf_id).await {
Ok(wf) => {
if wf.owner_pubkey != owner_bytes {
return Err(anyhow::anyhow!("forbidden: deletion owner mismatch"));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔒 Security (deletion forgery). NIP-09 a-tag deletion previously only checked the actor matched the coordinate pubkey, then deleted by UUID. A forged 30620:<attacker>:<victim_uuid> could delete another user's workflow. We now load the row and assert owner_pubkey before deleting. This extra lookup is the ownership gate, not a redundant query.

}
state.db.delete_workflow(wf_id).await.map_err(|e| {
anyhow::anyhow!("failed to delete workflow {wf_id}: {e}")
})?;
tracing::info!(workflow_id = %wf_id, "Workflow deleted via NIP-09 a-tag (UUID)");
}
Err(buzz_db::DbError::NotFound(_)) => {
tracing::warn!("NIP-09 a-tag deletion: no workflow '{wf_id}' found");
}
Err(e) => {
return Err(anyhow::anyhow!("failed to lookup workflow for delete: {e}"));
}
}
} else {
// Name-based lookup
let owner_bytes = hex::decode(pubkey_hex).unwrap_or_default();
Expand Down
Loading
Loading