feat(spam-stream): per-signer concurrent sends#590
Conversation
drive_stream sent serially (send_one().await per spec), capping throughput at ~one RPC round-trip per tx (~20/s). Replace it with a per-signer worker pool: spawn one worker per pool signer, route each spec to the worker that owns its signer (round-robin by idx, matching make_strict_call's signer selection), and let workers send concurrently. Each signer's sends stay serial within its worker, so nonce assignment and reclaim-on-rejection (reuse the nonce, no gap) remain correct with no shared nonce state or locks. Concurrency == pool size; a pool of 1 reproduces the original serial behavior. Workers build/sign against a shared Arc<TestScenario> with a worker-local nonce (bypassing prepare_tx_request's &mut), and the gas price is shared via an atomic refreshed once per interval. Validated relaying interop executing messages: at a sustained rate above the serial ceiling, cross-chain inclusion latency dropped from p50 80s (queue-bound) to p50 4s with a 32-signer pool.
…igner concurrency Extract the two correctness invariants of the per-signer worker model into pure helpers and unit-test them (no RPC needed): - worker_index(idx, n): round-robin routing; assert idx and idx+n map to the same worker (so the same signer) and that it matches make_strict_call's idx % signers.len() pick — the property that keeps each signer serial. - next_nonce(nonce, submitted): advance on accept, reuse on rejection (no gap). Also assert pool signers are distinct (per-signer workers depend on it). Update docs/stream-mode.md: data-flow now shows the per-signer worker pool (concurrency == --pool-size, pool of 1 = serial) and the reuse table reflects workers building directly with a worker-local nonce instead of prepare_tx_request.
…urrency knob The per-signer concurrent-sends change resolves the last open question (concurrency bounded by pool size). Drop the resolved questions, keep the one real follow-up (Spammer-trait reuse) under Follow-ups, and note in the CLI table that --pool-size sets send concurrency (no new flag).
| } | ||
| // Route to the worker that owns this idx's signer. | ||
| let w = worker_index(idx, n_workers); | ||
| if worker_txs[w].send((idx, spec)).await.is_err() { |
There was a problem hiding this comment.
This send(...).await silently blocks the whole dispatch loop whenever a worker's channel saturates. Under asymmetric per-signer drain (e.g. a node that throttles one sender's pending txs), healthy workers are stalled too without any signal to the producer. The reader/drive_stream hop handles this in forward_lines by emitting a Backpressure event once per saturation episode before blocking. The worker hop is the one place that backpressure isn't currently observable. Maybe worth mirroring the same pattern so a saturated signer shows up as a backpressure event instead of a silent drop in throughput?
There was a problem hiding this comment.
Fixed in fcff894. Worker dispatch now uses try_send first, emits the existing Backpressure event once per worker saturation episode, then blocks only after making saturation observable.
| let mut sent = 0usize; | ||
| let mut failed = 0usize; | ||
| for handle in worker_handles { | ||
| if let Ok((s, f)) = handle.await { |
There was a problem hiding this comment.
A panic in a worker thread will be swallowed. Might want to add a log it to make that failure visible.
match handle.await {
Ok((s, f)) => { sent += s; failed += f; }
Err(e) => warn!("stream: send worker panicked: {e}"),
}
There was a problem hiding this comment.
Fixed in fcff894. Worker joins now match on JoinError and log panics with warn!, while still tallying successful worker results.
Summary
spam-stream'sdrive_streamcurrently sends serially —send_one(...).awaitper spec — so throughput is capped at roughly one RPC round-trip per tx (~20/s). When the stream supplies specs faster than that (e.g. relaying interop executing messages), a send queue builds and end-to-end latency balloons.This makes the send path concurrent while keeping nonce handling correct.
Approach: per-signer worker pool
Spawn one worker per pool signer. Each incoming spec is routed (round-robin by
idx, matchingmake_strict_call'sidx % signers.len()selection) to the worker that owns its signer, and workers send concurrently.Why per-signer rather than a semaphore over all sends: a bounced/rejected send must reuse its nonce (otherwise a gap stalls every later tx from that signer). That reclaim is only correct if a given signer's sends are serialized. Pinning each signer to one worker gives exactly that:
Arc<TestScenario>using a worker-local nonce (bypassingprepare_tx_request's&mut), with the gas price shared via an atomic refreshed once per interval.Compatibility
--pool-size 1reproduces the original serial behavior exactly.StreamEventschema are unchanged (the Go-side/reactive consumer correlates byidx, so out-of-order emits are fine).spam_stream.rs.Validation
Relaying interop executing messages at a sustained rate above the serial ceiling, with a 32-signer pool: cross-chain inclusion latency dropped from p50 80s (queue-bound) to p50 4s, with send→receipt p50 ~1.5s and 0 failures. At
--pool-size 1behavior is unchanged.