From 230ded92c76c0bbae656475c2c5288b0d10f5e6d Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Wed, 10 Jun 2026 10:36:22 -0400 Subject: [PATCH 1/5] feat(spam-stream): per-signer concurrent sends drive_stream sent serially (send_one().await per spec), capping throughput at ~one RPC round-trip per tx (~20/s). Replace it with a per-signer worker pool: spawn one worker per pool signer, route each spec to the worker that owns its signer (round-robin by idx, matching make_strict_call's signer selection), and let workers send concurrently. Each signer's sends stay serial within its worker, so nonce assignment and reclaim-on-rejection (reuse the nonce, no gap) remain correct with no shared nonce state or locks. Concurrency == pool size; a pool of 1 reproduces the original serial behavior. Workers build/sign against a shared Arc with a worker-local nonce (bypassing prepare_tx_request's &mut), and the gas price is shared via an atomic refreshed once per interval. Validated relaying interop executing messages: at a sustained rate above the serial ceiling, cross-chain inclusion latency dropped from p50 80s (queue-bound) to p50 4s with a 32-signer pool. --- crates/cli/src/commands/spam_stream.rs | 327 ++++++++++++++++--------- 1 file changed, 205 insertions(+), 122 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index b1bf958e..ad43c248 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -16,9 +16,10 @@ use crate::{ }; use alloy::{ consensus::TxType, - network::{AnyTxEnvelope, Ethereum, NetworkTransactionBuilder}, + network::{AnyTxEnvelope, Ethereum, EthereumWallet, NetworkTransactionBuilder}, primitives::{utils::format_ether, U256}, providers::Provider, + signers::local::PrivateKeySigner, transports::http::reqwest::Url, }; use clap::Args; @@ -26,8 +27,11 @@ use contender_core::{ agent_controller::{AgentClass, AgentStore}, db::{DbOps, SpamDuration, SpamRunRequest}, generator::{ - agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, - util::parse_value, FunctionCallDefinition, Generator, PlanConfig, RandSeed, + agent_pools::AgentSpec, + seeder::rand_seed::SeedGenerator, + templater::Templater, + util::{complete_tx_request, parse_value}, + FunctionCallDefinition, Generator, PlanConfig, RandSeed, }, spammer::tx_actor::{ActorContext, CacheTx}, test_scenario::{TestScenario, TestScenarioParams}, @@ -39,7 +43,10 @@ use serde::Serialize; use std::{ collections::HashMap, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use tokio::{ @@ -59,6 +66,9 @@ const OUTPUT_VERSION: u32 = 1; /// How often to refresh the cached gas price during the stream loop. Avoids an /// RPC round-trip on every single tx. const GAS_REFRESH_INTERVAL: Duration = Duration::from_secs(6); +/// Gas limit used when a stream spec doesn't set one (stream mode doesn't +/// estimate per tx; relay specs always carry an explicit gas_limit). +const DEFAULT_STREAM_GAS_LIMIT: u64 = 200_000; /// Versioned, tagged envelope written to stdout (one JSON line per event) so /// downstream consumers can evolve with the schema. The `version` field pins @@ -283,19 +293,61 @@ fn build_pool_agent_store(from_pool: &str, pool_size: usize, seed: &RandSeed) -> store } -/// Drive the stream loop: pull specs, build/sign/send txs, cache in the -/// tx_actor for receipt tracking. Returns when the stream channel closes. +/// Drive the stream loop with per-signer concurrency. One worker is spawned per +/// pool signer; each spec is routed (round-robin by idx, matching +/// `make_strict_call`'s signer selection) to the worker that owns its signer, and +/// workers send concurrently. Because each signer's sends stay serial within its +/// worker, nonce assignment and reclaim-on-bounce remain correct with no shared +/// nonce state. Concurrency == pool size; a pool of 1 reproduces serial sending. async fn drive_stream( - scenario: &mut TestScenario, + scenario: Arc>, mut rx: mpsc::Receiver, fallback_pool: String, tps: u64, cancel: CancellationToken, ) -> Result<(usize, usize)> where - S: SeedGenerator + Send + Sync + Clone, - P: PlanConfig + Templater + Send + Sync + Clone, + S: SeedGenerator + Send + Sync + Clone + 'static, + P: PlanConfig + Templater + Send + Sync + Clone + 'static, { + // Shared, periodically-refreshed gas price: one RPC per interval rather than + // one per tx (the per-tx fetch was part of the serial throughput ceiling). + let gas_price = Arc::new(AtomicU64::new( + scenario.rpc_client.get_gas_price().await? as u64, + )); + let mut last_gas_refresh = Instant::now(); + + // One worker per pool signer; route so signer k is only ever touched by + // worker k (matches make_strict_call's `idx % signers.len()` selection), so + // each signer's nonce + reclaim stay serial and race-free. + let signers = scenario + .agent_store + .get_agent(&fallback_pool) + .map(|a| a.signers.clone()) + .unwrap_or_default(); + if signers.is_empty() { + return Err(CliError::Args(ArgsError::Custom(format!( + "pool '{fallback_pool}' has no signers" + )))); + } + let n_workers = signers.len(); + + let mut worker_txs = Vec::with_capacity(n_workers); + let mut worker_handles = Vec::with_capacity(n_workers); + for signer in signers.into_iter() { + let (wtx, wrx) = mpsc::channel::<(usize, FunctionCallDefinition)>(64); + let nonce = scenario.nonces.get(&signer.address()).copied().unwrap_or(0); + let handle = tokio::spawn(send_worker( + scenario.clone(), + wrx, + signer, + nonce, + gas_price.clone(), + )); + worker_txs.push(wtx); + worker_handles.push(handle); + } + // Rate limiter: only ticks when tps > 0. let mut ticker = if tps > 0 { let period = Duration::from_secs_f64(1.0 / tps as f64); @@ -306,15 +358,7 @@ where None }; - // Cache the gas price instead of fetching it per tx; refreshed below. - let mut gas_price = scenario.rpc_client.get_gas_price().await?; - let mut last_gas_refresh = Instant::now(); - - let mut sent: usize = 0; - let mut failed: usize = 0; let mut idx: usize = 0; - let placeholder_map = HashMap::::new(); - loop { tokio::select! { _ = cancel.cancelled() => { @@ -339,131 +383,159 @@ where if last_gas_refresh.elapsed() >= GAS_REFRESH_INTERVAL { if let Ok(gp) = scenario.rpc_client.get_gas_price().await { - gas_price = gp; + gas_price.store(gp as u64, Ordering::Relaxed); } last_gas_refresh = Instant::now(); } - match send_one(scenario, &spec, idx, &placeholder_map, gas_price).await { - Ok(true) => sent += 1, - Ok(false) => failed += 1, - Err(e) => { - failed += 1; - warn!("stream: failed to send tx (idx {idx}): {e}"); - } + // Route to the worker that owns this idx's signer. + let w = idx % n_workers; + if worker_txs[w].send((idx, spec)).await.is_err() { + break; } idx += 1; } } } + // Close worker inputs and join; sum their tallies. + drop(worker_txs); + let mut sent = 0usize; + let mut failed = 0usize; + for handle in worker_handles { + if let Ok((s, f)) = handle.await { + sent += s; + failed += f; + } + } Ok((sent, failed)) } -/// Build a single transaction from a stream spec, sign it, send it, and cache -/// it in the tx_actor for receipt tracking. -async fn send_one( - scenario: &mut TestScenario, - spec: &FunctionCallDefinition, - idx: usize, - placeholder_map: &HashMap, - gas_price: u128, -) -> Result +/// Per-signer send worker: pulls `(idx, spec)` for one signer, builds + signs +/// with that signer using a locally-tracked nonce, and sends. Serial within the +/// signer (so the nonce counter and reclaim are race-free); concurrency comes +/// from running one worker per signer. On a send rejection the nonce is reused +/// (not advanced), matching the serial path's reclaim — no gap, no stall. +async fn send_worker( + scenario: Arc>, + mut rx: mpsc::Receiver<(usize, FunctionCallDefinition)>, + signer: PrivateKeySigner, + mut nonce: u64, + gas_price: Arc, +) -> (usize, usize) where - S: SeedGenerator + Send + Sync + Clone, - P: PlanConfig + Templater + Send + Sync + Clone, + S: SeedGenerator + Send + Sync + Clone + 'static, + P: PlanConfig + Templater + Send + Sync + Clone + 'static, { - // Stream mode only builds EIP-1559 txs (no blob gas price, no auth list), so - // reject blob (4844) / setCode (7702) specs up front instead of silently - // producing an invalid tx. - if spec.blob_data.is_some() || spec.authorization_address.is_some() { - warn!("stream tx[{idx}]: blob/7702 specs are unsupported in stream mode; skipping"); - return Ok(false); - } - - // 1. Resolve `from`/`from_pool` and access list against the scenario's - // agent store + templater. This produces a strict FunctionCallDefinition. - let strict = scenario - .make_strict_call(spec, idx) - .map_err(contender_core::Error::Generator)?; - - // 2. Render the strict definition into a TransactionRequest (encodes - // calldata, threads access_list, sets value/gas_limit). - let tx_req = scenario - .get_templater() - .template_function_call(&strict, placeholder_map) - .map_err(contender_core::Error::Templater)?; - - // 3. Assign nonce/gas-limit + sign using the cached gas price. - let (prepared, wallet) = scenario.prepare_tx_request(&tx_req, gas_price, 0).await?; - // Build & sign via the alloy Ethereum network. The op-alloy-network re-export - // can create trait-resolution ambiguity, so we fully-qualify the trait - // method and convert the error string-ly instead of relying on From. - let envelope = - >::build( - prepared, &wallet, - ) - .await - .map_err(|e| CliError::Args(ArgsError::Custom(format!("build envelope: {e}"))))?; - let tx_hash = envelope.tx_hash().to_owned(); - - // 4. Send via the same txs_client the regular spammer uses. - let any_envelope = AnyTxEnvelope::Ethereum(envelope); - let start_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis(); - let res = scenario.txs_client.send_tx_envelope(any_envelope).await; - let error = match res { - Ok(_) => { - info!("stream tx[{idx}]: {tx_hash} sent"); - None - } - Err(e) => { - let msg = e - .as_error_resp() - .map(|err| err.message.to_string()) - .unwrap_or_else(|| format!("{e}")); - warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); - Some(msg) - } - }; - - let submitted = error.is_none(); - if !submitted { - // prepare_tx_request already advanced this account's local nonce, but - // the tx never entered the mempool (e.g. rejected by an interop access - // -list filter). Reclaim the nonce so the next send from this account - // doesn't leave a gap that stalls every later tx behind it. The stream - // sends serially, so nothing else has touched this account meanwhile. - if let Some(n) = scenario.nonces.get_mut(&strict.from) { - *n = n.saturating_sub(1); + let wallet = EthereumWallet::from(signer); + let placeholder_map = HashMap::::new(); + let mut sent = 0usize; + let mut failed = 0usize; + + while let Some((idx, spec)) = rx.recv().await { + // Stream mode only builds EIP-1559 txs; reject blob/7702 specs. + if spec.blob_data.is_some() || spec.authorization_address.is_some() { + warn!("stream tx[{idx}]: blob/7702 specs are unsupported in stream mode; skipping"); + failed += 1; + continue; } - } - // 5. Emit a structured result line to stdout (mirrors the input stream so - // reactive callers can correlate sends with their specs). - StreamEvent::emit(StreamPayload::TxResult { - idx, - tx_hash: tx_hash.to_string(), - start_timestamp_ms: start_ms, - kind: spec.kind.clone(), - error: error.clone(), - }); + let strict = match scenario.make_strict_call(&spec, idx) { + Ok(s) => s, + Err(e) => { + warn!("stream tx[{idx}]: build failed: {e}"); + failed += 1; + continue; + } + }; + let tx_req = match scenario + .get_templater() + .template_function_call(&strict, &placeholder_map) + { + Ok(t) => t, + Err(e) => { + warn!("stream tx[{idx}]: template failed: {e}"); + failed += 1; + continue; + } + }; - // 6. Cache in the tx_actor so its flush loop polls for the receipt. - scenario - .tx_actor() - .cache_run_tx(CacheTx { - tx_hash, + // Build + sign with a worker-local nonce (no &mut scenario, no shared + // nonce map), using the spec's gas limit instead of a per-tx estimate. + let gp = gas_price.load(Ordering::Relaxed) as u128; + let gas_limit = tx_req.gas.unwrap_or(DEFAULT_STREAM_GAS_LIMIT); + let priority_fee = tx_req.max_priority_fee_per_gas.unwrap_or(gp / 10); + let mut full_tx = tx_req; + full_tx.nonce = Some(nonce); + complete_tx_request( + &mut full_tx, + scenario.tx_type, + gp, + priority_fee, + gas_limit, + scenario.chain_id, + 0, + ); + let envelope = match >::build(full_tx, &wallet) + .await + { + Ok(e) => e, + Err(e) => { + warn!("stream tx[{idx}]: build envelope failed: {e}"); + failed += 1; + continue; + } + }; + let tx_hash = envelope.tx_hash().to_owned(); + let start_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let res = scenario + .txs_client + .send_tx_envelope(AnyTxEnvelope::Ethereum(envelope)) + .await; + let error = match res { + Ok(_) => { + info!("stream tx[{idx}]: {tx_hash} sent"); + nonce += 1; + sent += 1; + None + } + Err(e) => { + let msg = e + .as_error_resp() + .map(|err| err.message.to_string()) + .unwrap_or_else(|| format!("{e}")); + warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); + // Send rejected → tx never entered the mempool → reuse this nonce + // (don't advance) so the next send fills the slot without a gap. + failed += 1; + Some(msg) + } + }; + StreamEvent::emit(StreamPayload::TxResult { + idx, + tx_hash: tx_hash.to_string(), start_timestamp_ms: start_ms, - end_timestamp_ms: None, kind: spec.kind.clone(), - error, - }) - .await?; + error: error.clone(), + }); + let _ = scenario + .tx_actor() + .cache_run_tx(CacheTx { + tx_hash, + start_timestamp_ms: start_ms, + end_timestamp_ms: None, + kind: spec.kind.clone(), + error, + }) + .await; + } - Ok(submitted) + (sent, failed) } /// Top-level entry point invoked from `main.rs`. @@ -622,8 +694,11 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs, data_dir: &Path ctrlc_cancel.cancel(); }); + // Share the now-fully-initialized scenario across the per-signer send + // workers. All &mut setup (nonce sync, funding, actor ctx) is already done. + let scenario = Arc::new(scenario); let (sent, failed) = drive_stream( - &mut scenario, + scenario.clone(), rx, args.from_pool.clone(), args.tps, @@ -634,6 +709,14 @@ pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs, data_dir: &Path info!("stream complete: {sent} sent, {failed} failed; draining pending receipts..."); + // drive_stream has joined all workers, so their Arc clones are dropped and we + // can reclaim exclusive ownership for the receipt drain + shutdown. + let mut scenario = Arc::try_unwrap(scenario).map_err(|_| { + CliError::Args(ArgsError::Custom( + "scenario still shared after drive_stream returned".into(), + )) + })?; + tokio::select! { _ = scenario.dump_tx_cache(run_id) => {} _ = tokio::signal::ctrl_c() => { From e161dd422ca44e98d442018322edb3ee85c0c7e3 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Wed, 10 Jun 2026 10:47:50 -0400 Subject: [PATCH 2/5] test(spam-stream): cover signer routing + nonce reuse; document per-signer concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the two correctness invariants of the per-signer worker model into pure helpers and unit-test them (no RPC needed): - worker_index(idx, n): round-robin routing; assert idx and idx+n map to the same worker (so the same signer) and that it matches make_strict_call's idx % signers.len() pick — the property that keeps each signer serial. - next_nonce(nonce, submitted): advance on accept, reuse on rejection (no gap). Also assert pool signers are distinct (per-signer workers depend on it). Update docs/stream-mode.md: data-flow now shows the per-signer worker pool (concurrency == --pool-size, pool of 1 = serial) and the reuse table reflects workers building directly with a worker-local nonce instead of prepare_tx_request. --- crates/cli/src/commands/spam_stream.rs | 85 ++++++++++++++++++++++++-- docs/stream-mode.md | 16 +++-- 2 files changed, 92 insertions(+), 9 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index ad43c248..9902a271 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -293,6 +293,27 @@ fn build_pool_agent_store(from_pool: &str, pool_size: usize, seed: &RandSeed) -> store } +/// Worker (and pool-signer) index for a stream index. Specs are assigned +/// round-robin across the pool, matching `make_strict_call`'s `idx % +/// signers.len()` signer selection, so routing spec `idx` to `worker_index(idx, +/// n)` guarantees each signer is only ever touched by one worker — which is what +/// keeps that signer's nonce handling serial and correct. +fn worker_index(idx: usize, n_workers: usize) -> usize { + idx % n_workers +} + +/// Next local nonce after a send attempt. Advances on an accepted send; on a +/// rejected send the tx never entered the mempool, so the nonce is reused +/// (returned unchanged) to avoid a gap that would stall every later tx from the +/// signer. Correct only because each signer's sends are serial within its worker. +fn next_nonce(nonce: u64, submitted: bool) -> u64 { + if submitted { + nonce + 1 + } else { + nonce + } +} + /// Drive the stream loop with per-signer concurrency. One worker is spawned per /// pool signer; each spec is routed (round-robin by idx, matching /// `make_strict_call`'s signer selection) to the worker that owns its signer, and @@ -389,7 +410,7 @@ where } // Route to the worker that owns this idx's signer. - let w = idx % n_workers; + let w = worker_index(idx, n_workers); if worker_txs[w].send((idx, spec)).await.is_err() { break; } @@ -497,10 +518,10 @@ where .txs_client .send_tx_envelope(AnyTxEnvelope::Ethereum(envelope)) .await; + let submitted = res.is_ok(); let error = match res { Ok(_) => { info!("stream tx[{idx}]: {tx_hash} sent"); - nonce += 1; sent += 1; None } @@ -510,12 +531,14 @@ where .map(|err| err.message.to_string()) .unwrap_or_else(|| format!("{e}")); warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); - // Send rejected → tx never entered the mempool → reuse this nonce - // (don't advance) so the next send fills the slot without a gap. failed += 1; Some(msg) } }; + // Advance the nonce only on an accepted send; a rejected send never + // entered the mempool, so reuse the nonce (no gap that would stall the + // signer behind it). + nonce = next_nonce(nonce, submitted); StreamEvent::emit(StreamPayload::TxResult { idx, tx_hash: tx_hash.to_string(), @@ -903,4 +926,58 @@ mod tests { } assert_eq!(received.len(), 1); } + + #[test] + fn worker_index_is_round_robin_and_per_signer_stable() { + // The per-signer concurrency model is correct only if each signer is + // touched by exactly one worker. With n workers: one full cycle hits + // every worker once, idx and idx+n return to the same worker (so the + // same signer), and adjacent specs spread across workers. + let n = 4; + let mut cycle: Vec = (0..n).map(|i| worker_index(i, n)).collect(); + cycle.sort_unstable(); + cycle.dedup(); + assert_eq!( + cycle, + vec![0, 1, 2, 3], + "a full cycle hits each worker once" + ); + for idx in 0..10 { + assert_eq!(worker_index(idx, n), worker_index(idx + n, n)); + } + assert_ne!(worker_index(0, n), worker_index(1, n)); + } + + #[test] + fn worker_routing_matches_pool_signer_selection() { + // Routing must pick the same index make_strict_call uses for the signer + // (idx % signers.len()), so worker k always owns pool signer k. + let seed = RandSeed::seed_from_str("0xabc"); + let store = build_pool_agent_store("executors", 5, &seed); + let agent = store.get_agent("executors").expect("pool exists"); + let n = agent.signers.len(); + assert_eq!(n, 5); + for idx in 0..23usize { + assert_eq!(worker_index(idx, n), idx % agent.signers.len()); + } + } + + #[test] + fn next_nonce_reuses_on_rejection() { + // Accepted send advances the nonce; rejected send reuses it (no gap). + assert_eq!(next_nonce(7, true), 8); + assert_eq!(next_nonce(7, false), 7); + } + + #[test] + fn pool_signers_are_distinct() { + // Per-signer workers require the pool to have distinct signers. + let seed = RandSeed::seed_from_str("0xfeed"); + let store = build_pool_agent_store("executors", 8, &seed); + let mut addrs = store.all_signer_addresses(); + assert_eq!(addrs.len(), 8); + addrs.sort_unstable(); + addrs.dedup(); + assert_eq!(addrs.len(), 8, "pool signers must be distinct"); + } } diff --git a/docs/stream-mode.md b/docs/stream-mode.md index 27a1dc55..ab013f1a 100644 --- a/docs/stream-mode.md +++ b/docs/stream-mode.md @@ -90,12 +90,18 @@ stdin/file -> reader task -> mpsc drive_stream loop | v - for each spec: + drive_stream routes each spec round-robin to one worker per pool signer + (worker k owns signer k); workers send concurrently. Per worker: scenario.make_strict_call (Generator trait, resolves from_pool + access_list) - scenario.config.template_function_call (Templater, builds TransactionRequest) - scenario.prepare_tx_request (assigns nonce, gas limit, signs key from pool) - scenario.txs_client.send_tx_envelope + template_function_call (Templater, builds TransactionRequest) + build w/ worker-local nonce + complete_tx_request, sign with signer k + scenario.txs_client.send_tx_envelope (concurrent across signers) scenario.tx_actor().cache_run_tx (queues for receipt polling) + + Each signer's sends stay serial within its worker, so nonce assignment + and reclaim-on-rejection (reuse the nonce, no gap) are race-free with no + shared nonce state or locks. Concurrency == --pool-size; --pool-size 1 + reproduces the original serial sending. ``` Code map: @@ -137,7 +143,7 @@ consumers: | `TestScenario` constructor (signer map, nonce sync, txs_client) | yes | | `Generator::make_strict_call` (resolves `from_pool`, access list, EIP-7702) | yes | | `Templater::template_function_call` (calldata encoding, access list threading) | yes | -| `TestScenario::prepare_tx_request` (nonce, gas limit, complete_tx_request) | yes | +| `complete_tx_request` (gas fields) | yes — per-signer workers build directly with a worker-local nonce instead of `prepare_tx_request`, so each signer's nonce stays serial under concurrency | | Pool generation via `AgentPools::build_agent_store` | yes | | `TxActorHandle::cache_run_tx` + flush loop (DB writes, receipt polling) | yes | | `fund_accounts` helper | yes | From b0e961414fffb20d908a054543c83d6bd43c6934 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Wed, 10 Jun 2026 10:58:07 -0400 Subject: [PATCH 3/5] docs(stream-mode): resolve open questions, document pool-size as concurrency knob The per-signer concurrent-sends change resolves the last open question (concurrency bounded by pool size). Drop the resolved questions, keep the one real follow-up (Spammer-trait reuse) under Follow-ups, and note in the CLI table that --pool-size sets send concurrency (no new flag). --- docs/stream-mode.md | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/docs/stream-mode.md b/docs/stream-mode.md index ab013f1a..39143d83 100644 --- a/docs/stream-mode.md +++ b/docs/stream-mode.md @@ -40,8 +40,8 @@ Key flags: | `-p, --priv-key` | none | Funder key (funds the pool before spam starts). | | `--from` | `stdin` | `stdin` or a file path. | | `--from-pool` | `executors` | Pool name. Specs that omit `from`/`from_pool` use this pool. | -| `--pool-size` | `10` | Accounts generated in the pool. | -| `--tps` | `0` | `0` = consume as fast as channel emits. | +| `--pool-size` | `10` | Accounts generated in the pool. Also sets send concurrency: one worker per signer sends in parallel (pool of 1 = serial). | +| `--tps` | `0` | `0` = consume as fast as channel emits (sends run concurrently across the pool). | | `--min-balance` | `0.01 ETH` (wei) | Min pool-account balance during funding. | | `--skip-funding` | `false` | Skip pre-spam funding. | | `--seed` | random | Deterministic pool generation. | @@ -200,20 +200,8 @@ echo '{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit" The tx should land on the target chain; the funder needs at least enough ETH to fund the executor pool. -## Open questions - -1. Should stream mode get its own `Spammer` impl in `contender_core` so - campaigns can reuse it? Today the prototype lives entirely in `cli/`. - *(Deferred to a follow-up: refactoring the `Spammer` trait is out of scope - for the prototype.)* -2. ~~Is the JSON spec the right shape, or should we standardize on a tagged - envelope so we can evolve it later?~~ **Resolved:** the stdout output is now - a versioned, tagged envelope (`{"version":1,"type":"tx_result",...}`). See - "Structured output" above. -3. ~~How should errors propagate back to the upstream producer?~~ **Resolved:** - `spam-stream` emits a structured `tx_result` event per spec on stdout - (including send errors), plus `backpressure` and a terminal `summary` event, - in addition to the DB + logs. -4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is - "one in flight at a time" acceptable for the relayer case? *(Deferred: - parallel sends judged not worth the effort for the prototype.)* +## Follow-ups + +- Stream mode could grow its own `Spammer` impl in `contender_core` so campaigns + can reuse it (today it lives entirely in `cli/`); this needs a generic + `SpamSource` abstraction across the existing spammers. See "Reuse vs. new code". From 46eba766a823432ec79d2cf4c845472c7e833236 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Wed, 10 Jun 2026 11:01:38 -0400 Subject: [PATCH 4/5] docs(stream-mode): drop the Follow-ups section --- docs/stream-mode.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/stream-mode.md b/docs/stream-mode.md index 39143d83..f00437b3 100644 --- a/docs/stream-mode.md +++ b/docs/stream-mode.md @@ -199,9 +199,3 @@ echo '{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit" The tx should land on the target chain; the funder needs at least enough ETH to fund the executor pool. - -## Follow-ups - -- Stream mode could grow its own `Spammer` impl in `contender_core` so campaigns - can reuse it (today it lives entirely in `cli/`); this needs a generic - `SpamSource` abstraction across the existing spammers. See "Reuse vs. new code". From fcff89446418e8f53276bc37275be568b597f744 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Tue, 23 Jun 2026 09:58:34 -0400 Subject: [PATCH 5/5] fix clippy and stream worker backpressure --- crates/cli/src/commands/spam_stream.rs | 28 ++++++-- crates/cli/src/server/error.rs | 6 +- crates/cli/src/server/rpc_server/server.rs | 78 ++++++++++++---------- crates/core/src/flashblocks.rs | 15 +++-- 4 files changed, 75 insertions(+), 52 deletions(-) diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs index 9902a271..4f72840a 100644 --- a/crates/cli/src/commands/spam_stream.rs +++ b/crates/cli/src/commands/spam_stream.rs @@ -355,6 +355,7 @@ where let mut worker_txs = Vec::with_capacity(n_workers); let mut worker_handles = Vec::with_capacity(n_workers); + let mut worker_backpressured = vec![false; n_workers]; for signer in signers.into_iter() { let (wtx, wrx) = mpsc::channel::<(usize, FunctionCallDefinition)>(64); let nonce = scenario.nonces.get(&signer.address()).copied().unwrap_or(0); @@ -411,8 +412,22 @@ where // Route to the worker that owns this idx's signer. let w = worker_index(idx, n_workers); - if worker_txs[w].send((idx, spec)).await.is_err() { - break; + match worker_txs[w].try_send((idx, spec)) { + Ok(()) => worker_backpressured[w] = false, + Err(mpsc::error::TrySendError::Full((idx, spec))) => { + if !worker_backpressured[w] { + worker_backpressured[w] = true; + let capacity = worker_txs[w].max_capacity(); + StreamEvent::emit(StreamPayload::Backpressure { + queued: capacity.saturating_sub(worker_txs[w].capacity()), + capacity, + }); + } + if worker_txs[w].send((idx, spec)).await.is_err() { + break; + } + } + Err(mpsc::error::TrySendError::Closed(_)) => break, } idx += 1; } @@ -424,9 +439,12 @@ where let mut sent = 0usize; let mut failed = 0usize; for handle in worker_handles { - if let Ok((s, f)) = handle.await { - sent += s; - failed += f; + match handle.await { + Ok((s, f)) => { + sent += s; + failed += f; + } + Err(e) => warn!("stream: send worker panicked: {e}"), } } Ok((sent, failed)) diff --git a/crates/cli/src/server/error.rs b/crates/cli/src/server/error.rs index 636983ac..3dfb0d80 100644 --- a/crates/cli/src/server/error.rs +++ b/crates/cli/src/server/error.rs @@ -13,16 +13,16 @@ pub enum ContenderRpcError { SessionNotFound(usize), #[error("Session {} is not initialized", _0.id)] - SessionNotInitialized(ContenderSessionInfo), + SessionNotInitialized(Box), #[error("Session {} failed: {error}", info.id)] SessionFailed { - info: ContenderSessionInfo, + info: Box, error: String, }, #[error("Session {} is currently busy: {:?}", _0.id, _0.status)] - SessionBusy(ContenderSessionInfo), + SessionBusy(Box), #[error("Session {0} is not currently spamming")] SessionNotBusy(usize), diff --git a/crates/cli/src/server/rpc_server/server.rs b/crates/cli/src/server/rpc_server/server.rs index 2b0a0c1c..491ffb7d 100644 --- a/crates/cli/src/server/rpc_server/server.rs +++ b/crates/cli/src/server/rpc_server/server.rs @@ -424,43 +424,43 @@ impl ContenderRpcServer for ContenderServer { // Grab cached funding data under a brief read lock — available even // while the contender is taken out for spamming. - let (funder, agent, rpc_client) = - { - let sessions = self.sessions.read().await; - let Some(session) = sessions.get_session(session_id) else { - return Err(ContenderRpcError::SessionNotFound(session_id).into()); - }; - // Allow funding in any initialized state (Ready or Spamming). - match &session.info.status { - SessionStatus::Failed(msg) => { - return Err(ContenderRpcError::SessionFailed { - info: session.info.clone(), - error: msg.to_owned(), - } - .into()); - } - SessionStatus::Ready | SessionStatus::Spamming(_) => {} - _ => { - return Err( - ContenderRpcError::SessionNotInitialized(session.info.clone()).into(), - ); + let (funder, agent, rpc_client) = { + let sessions = self.sessions.read().await; + let Some(session) = sessions.get_session(session_id) else { + return Err(ContenderRpcError::SessionNotFound(session_id).into()); + }; + // Allow funding in any initialized state (Ready or Spamming). + match &session.info.status { + SessionStatus::Failed(msg) => { + return Err(ContenderRpcError::SessionFailed { + info: Box::new(session.info.clone()), + error: msg.to_owned(), } + .into()); + } + SessionStatus::Ready | SessionStatus::Spamming(_) => {} + _ => { + return Err(ContenderRpcError::SessionNotInitialized(Box::new( + session.info.clone(), + )) + .into()); } + } - let funder = session.funder.clone().ok_or_else(|| { - ContenderRpcError::SessionNotInitialized(session.info.clone()) - })?; - let rpc_client = session.rpc_client.clone().ok_or_else(|| { - ContenderRpcError::SessionNotInitialized(session.info.clone()) - })?; - let agent_store = session.agent_store.as_ref().ok_or_else(|| { - ContenderRpcError::SessionNotInitialized(session.info.clone()) - })?; - - let agent_class = params.agent_class.unwrap_or_default(); - let agent = agent_store.get_class(&agent_class).cloned(); - (funder, agent, rpc_client) - }; + let funder = session.funder.clone().ok_or_else(|| { + ContenderRpcError::SessionNotInitialized(Box::new(session.info.clone())) + })?; + let rpc_client = session.rpc_client.clone().ok_or_else(|| { + ContenderRpcError::SessionNotInitialized(Box::new(session.info.clone())) + })?; + let agent_store = session.agent_store.as_ref().ok_or_else(|| { + ContenderRpcError::SessionNotInitialized(Box::new(session.info.clone())) + })?; + + let agent_class = params.agent_class.unwrap_or_default(); + let agent = agent_store.get_class(&agent_class).cloned(); + (funder, agent, rpc_client) + }; let span = tracing::info_span!("session_fund_accounts", id = session_id); let sessions = Arc::clone(&self.sessions); @@ -509,16 +509,20 @@ fn error_if_session_not_ready(session: &ContenderSession) -> jsonrpsee::core::Rp let _: () = match &session.info.status { SessionStatus::Failed(msg) => { return Err(ContenderRpcError::SessionFailed { - info: session.info.clone(), + info: Box::new(session.info.clone()), error: msg.to_owned(), } .into()) } SessionStatus::Spamming(_) => { - return Err(ContenderRpcError::SessionBusy(session.info.clone()).into()) + return Err(ContenderRpcError::SessionBusy(Box::new(session.info.clone())).into()) } SessionStatus::Ready => (), - _ => return Err(ContenderRpcError::SessionNotInitialized(session.info.clone()).into()), + _ => { + return Err( + ContenderRpcError::SessionNotInitialized(Box::new(session.info.clone())).into(), + ) + } }; Ok(()) } diff --git a/crates/core/src/flashblocks.rs b/crates/core/src/flashblocks.rs index 3ac0acc9..a1ef819b 100644 --- a/crates/core/src/flashblocks.rs +++ b/crates/core/src/flashblocks.rs @@ -28,7 +28,7 @@ impl FlashblocksClient { .await .map_err(|e| FlashblocksError::ConnectionFailed { url: ws_url.to_owned(), - err: e, + err: Box::new(e), })?; Ok((ws_stream, response)) } @@ -47,7 +47,8 @@ impl FlashblocksClient { let timeout_duration = Duration::from_secs(10); let preflight_result: String = tokio::time::timeout(timeout_duration, async { while let Some(msg_result) = ws_stream.next().await { - let res = msg_result.map_err(FlashblocksError::PreflightRequestFailed)?; + let res = msg_result + .map_err(|e| FlashblocksError::PreflightRequestFailed(Box::new(e)))?; if let Some(text) = ws_message_to_text(res) { return Ok(text); } @@ -93,7 +94,7 @@ impl FlashblocksClient { while let Some(msg_result) = read.next().await { let msg = msg_result.map_err(|e| { cancel_token.cancel(); - FlashblocksError::ConnectionLost(e) + FlashblocksError::ConnectionLost(Box::new(e)) })?; if matches!(msg, Message::Close(_)) { @@ -183,18 +184,18 @@ pub enum FlashblocksError { #[error("Failed to connect to flashblocks WS endpoint {url}: {err}")] ConnectionFailed { - err: tokio_tungstenite::tungstenite::Error, + err: Box, url: Url, }, #[error("Flashblocks WS connection lost")] - ConnectionLost(tungstenite::Error), + ConnectionLost(Box), #[error("Flashblocks WS connection closed during preflight")] PreflightConnectionClosed, #[error("Flashblocks WS connection error during preflight")] - PreflightRequestFailed(tokio_tungstenite::tungstenite::Error), + PreflightRequestFailed(Box), #[error("Flashblocks WS endpoint did not send any data within {} seconds", _0.as_secs())] PreflightTimeout(std::time::Duration), @@ -203,7 +204,7 @@ pub enum FlashblocksError { PreflightInvalidResult(String), #[error("Failed to close write stream for Flashblocks WS endpoint")] - WriteStreamClose(tokio_tungstenite::tungstenite::Error), + WriteStreamClose(Box), } /// Flashblock mark from the WS listener (separate channel to avoid backpressure on flush)