From 55cdf347fb2d9f4fbddfedcbc8d7e4b6ff09c27f Mon Sep 17 00:00:00 2001 From: Claude Code Date: Tue, 19 May 2026 14:38:30 -0700 Subject: [PATCH 1/8] new state --- rust_snuba/src/strategies/blq_router.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index d11755d840..f295ebe5ac 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -58,6 +58,8 @@ enum State { pub struct BLQRouter { next_step: Next, state: State, + prev_flag_state: bool, + blq_active: bool, producer: ProduceStrategy, // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well @@ -93,7 +95,7 @@ where Next: ProcessingStrategy + 'static, ProduceStrategy: ProcessingStrategy + 'static, { - fn is_enabled(&self) -> bool { + fn is_enabled() -> bool { options("snuba") .ok() .and_then(|o| o.get("consumer.blq_enabled").ok()) @@ -131,9 +133,12 @@ where } fn new_with_strategy(next_step: Next, blq_producer: ProduceStrategy) -> Self { + let flag = Self::is_enabled(); Self { next_step, state: State::Idle, + prev_flag_state: flag, + blq_active: flag, producer: blq_producer, _concurrency: None, } @@ -146,6 +151,7 @@ where ProduceStrategy: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { + self.prev_flag_state = Self::is_enabled(); let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); match &mut self.state { @@ -160,7 +166,7 @@ where } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if !self.is_enabled() { + if !Self::is_enabled() { return self.next_step.submit(message); } From 02836812b0e67a4e44ba48c44854dc0bc75a6abf Mon Sep 17 00:00:00 2001 From: Claude Code Date: Tue, 19 May 2026 14:51:41 -0700 Subject: [PATCH 2/8] flip restart logic --- rust_snuba/src/strategies/blq_router.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index f295ebe5ac..c00ed20669 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -151,7 +151,14 @@ where ProduceStrategy: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { - self.prev_flag_state = Self::is_enabled(); + let new_flag = Self::is_enabled(); + if !self.prev_flag_state && new_flag { + tracing::info!( + "consumer.blq_enabled flipped on at runtime; exiting consumer to flush downstream state" + ); + std::process::exit(0); + } + self.prev_flag_state = new_flag; let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); match &mut self.state { From 933f7b82861d4b1a9717ffcd307e347daf751254 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 20 May 2026 13:40:59 -0700 Subject: [PATCH 3/8] blq knows consumer group --- rust_snuba/src/factory_v2.rs | 2 ++ rust_snuba/src/strategies/blq_router.rs | 41 ++++++++++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 00ad6cdbf1..102069014f 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -282,6 +282,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { next_step, blq_producer_config.clone(), blq_topic, + self.physical_consumer_group.clone(), )) } else { tracing::info!("Not using a backlog-queue",); @@ -428,6 +429,7 @@ impl ConsumerStrategyFactoryV2 { next_step, blq_producer_config.clone(), blq_topic, + self.physical_consumer_group.clone(), )) } else { tracing::info!("Not using a backlog-queue",); diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index c00ed20669..52d5e9f624 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -64,6 +64,7 @@ pub struct BLQRouter { // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well _concurrency: Option, + _consumer_group: String, } impl BLQRouter> @@ -76,7 +77,12 @@ where /// The stale threshold and static friction are read at runtime from sentry-options /// (`consumer.blq_stale_threshold_seconds`, `consumer.blq_static_friction_seconds`) /// so they can be tuned without a restart. - pub fn new(next_step: Next, blq_producer_config: KafkaConfig, blq_topic: Topic) -> Self { + pub fn new( + next_step: Next, + blq_producer_config: KafkaConfig, + blq_topic: Topic, + consumer_group: String, + ) -> Self { let concurrency = ConcurrencyConfig::new(10); let blq_producer = Produce::new( CommitOffsets::new(Duration::from_millis(250)), @@ -84,7 +90,7 @@ where &concurrency, TopicOrPartition::Topic(blq_topic), ); - let mut router = Self::new_with_strategy(next_step, blq_producer); + let mut router = Self::new_with_strategy(next_step, blq_producer, consumer_group); router._concurrency = Some(concurrency); router } @@ -132,7 +138,11 @@ where } } - fn new_with_strategy(next_step: Next, blq_producer: ProduceStrategy) -> Self { + fn new_with_strategy( + next_step: Next, + blq_producer: ProduceStrategy, + consumer_group: String, + ) -> Self { let flag = Self::is_enabled(); Self { next_step, @@ -141,6 +151,7 @@ where blq_active: flag, producer: blq_producer, _concurrency: None, + _consumer_group: consumer_group, } } } @@ -328,7 +339,11 @@ mod tests { ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) .unwrap(); - let mut router = BLQRouter::new_with_strategy(MockStrategy::new(), MockStrategy::new()); + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + "test_consumer_group".to_string(), + ); // consuming messages as normal for _ in 0..10 { router.submit(make_message(Utc::now())).unwrap(); @@ -373,7 +388,11 @@ mod tests { ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) .unwrap(); - let mut router = BLQRouter::new_with_strategy(MockStrategy::new(), MockStrategy::new()); + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + "test_consumer_group".to_string(), + ); // backlog of 10 stale messages for _ in 0..10 { router @@ -399,7 +418,11 @@ mod tests { // When the feature flag is not set, stale messages should pass through // to next_step instead of being routed to BLQ init_config(); - let mut router = BLQRouter::new_with_strategy(MockStrategy::new(), MockStrategy::new()); + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + "test_consumer_group".to_string(), + ); for _ in 0..5 { router @@ -424,7 +447,11 @@ mod tests { ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) .unwrap(); - let mut router = BLQRouter::new_with_strategy(MockStrategy::new(), MockStrategy::new()); + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + "test_consumer_group".to_string(), + ); for _ in 0..5 { router From 6e2fac931748813fe50d73c370c97dd0f746ab4c Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 20 May 2026 14:25:18 -0700 Subject: [PATCH 4/8] flag is now consumer group specific --- rust_snuba/src/strategies/blq_router.rs | 29 ++++++++++++++++-------- sentry-options/schemas/snuba/schema.json | 4 ++-- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 52d5e9f624..e8f63300d5 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -64,7 +64,7 @@ pub struct BLQRouter { // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well _concurrency: Option, - _consumer_group: String, + consumer_group: String, } impl BLQRouter> @@ -101,11 +101,12 @@ where Next: ProcessingStrategy + 'static, ProduceStrategy: ProcessingStrategy + 'static, { - fn is_enabled() -> bool { + fn is_enabled(consumer_group: &str) -> bool { options("snuba") .ok() .and_then(|o| o.get("consumer.blq_enabled").ok()) - .and_then(|v| v.as_bool()) + .and_then(|v| v.as_str().map(str::to_owned)) + .map(|s| s != "" && s == consumer_group) .unwrap_or(false) } @@ -143,7 +144,7 @@ where blq_producer: ProduceStrategy, consumer_group: String, ) -> Self { - let flag = Self::is_enabled(); + let flag = Self::is_enabled(&consumer_group); Self { next_step, state: State::Idle, @@ -151,7 +152,7 @@ where blq_active: flag, producer: blq_producer, _concurrency: None, - _consumer_group: consumer_group, + consumer_group, } } } @@ -162,7 +163,7 @@ where ProduceStrategy: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { - let new_flag = Self::is_enabled(); + let new_flag = Self::is_enabled(&self.consumer_group); if !self.prev_flag_state && new_flag { tracing::info!( "consumer.blq_enabled flipped on at runtime; exiting consumer to flush downstream state" @@ -184,7 +185,7 @@ where } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if !Self::is_enabled() { + if !Self::is_enabled(&self.consumer_group) { return self.next_step.submit(message); } @@ -334,7 +335,11 @@ mod tests { */ init_config(); let _guard = override_options(&[ - ("snuba", "consumer.blq_enabled", json!(true)), + ( + "snuba", + "consumer.blq_enabled", + json!("test_consumer_group"), + ), ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) @@ -383,7 +388,11 @@ mod tests { */ init_config(); let _guard = override_options(&[ - ("snuba", "consumer.blq_enabled", json!(true)), + ( + "snuba", + "consumer.blq_enabled", + json!("test_consumer_group"), + ), ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) @@ -442,7 +451,7 @@ mod tests { // When the feature flag is explicitly false, stale messages should pass through init_config(); let _guard = override_options(&[ - ("snuba", "consumer.blq_enabled", json!(false)), + ("snuba", "consumer.blq_enabled", json!("")), ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) diff --git a/sentry-options/schemas/snuba/schema.json b/sentry-options/schemas/snuba/schema.json index 2ea86bd21c..ef9a18c609 100644 --- a/sentry-options/schemas/snuba/schema.json +++ b/sentry-options/schemas/snuba/schema.json @@ -13,8 +13,8 @@ "description": "true to emit a tracing log each time a duplicate trace item is detected in the accepted-outcomes aggregator" }, "consumer.blq_enabled": { - "type": "boolean", - "default": false, + "type": "string", + "default": "", "description": "enable backlog queue in snuba consumers" }, "consumer.blq_stale_threshold_seconds": { From 95a87813404c371ff906d60246d2d33cb383c6a8 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 20 May 2026 14:57:10 -0700 Subject: [PATCH 5/8] new logic --- rust_snuba/src/strategies/blq_router.rs | 116 ++++-------------------- 1 file changed, 16 insertions(+), 100 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index e8f63300d5..af8de2471d 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -2,32 +2,6 @@ //! //! BLQ Router is an arroyo strategy that re-directs stale messages (by timestamp) to a configured backlog-queue topic. //! Non-stale messages will be passively forwarded along to the next step in the arroyo strategy pipeline. -//! -//! ## Implementation -//! its essentially a FSM -//! -//! ```text -//! > forward to next step -//! ┌fresh─┐ -//! │ ▼ -//! ┌──┴─────────┐ -//! ┌fresh─►│ Forwarding ├──stale──► PANIC -//! │ └────────────┘ -//! │ ▲ -//! ┌──────┐ │ │ -//! ────►│ Idle ├─┤ fresh -//! └──────┘ │ │ -//! │ ┌──────┴───────┐ -//! └stale─►│ RoutingStale │ -//! └─┬────────────┘ -//! │ ▲ -//! └─stale──┘ -//! > redirect to blq -//! ``` -//! -//! the reason for the panic is that there may be accumulated data downstream that needs to be flushed before we start -//! redirecting to backlog and committing those messages. The most reliable way to do this is crashing the consumer, -//! when it comes back alive the first messages it gets will be stale so it will go straight from idle to RoutingStale. use std::time::Duration; @@ -44,20 +18,8 @@ use sentry_arroyo::processing::strategies::{ use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; use sentry_options::options; -#[derive(Debug, PartialEq)] -enum State { - Idle, // no messages have gone through the router yet - RoutingStale, // router is directing stale messages to the backlog-queue (BLQ) - // we have processed all stale messages and are now flushing (finishing producing to BLQ) - // when we transition to this state we will have CommitRequest for what was flushed, and poll - // will be responsible for returning it - Flushing(Option), - Forwarding, // router is forwarding non-stale messages along to the next strategy -} - pub struct BLQRouter { next_step: Next, - state: State, prev_flag_state: bool, blq_active: bool, producer: ProduceStrategy, @@ -74,8 +36,8 @@ where /// next_step, /// is where fresh messages get forwarded to. /// - /// The stale threshold and static friction are read at runtime from sentry-options - /// (`consumer.blq_stale_threshold_seconds`, `consumer.blq_static_friction_seconds`) + /// The stale threshold is read at runtime from sentry-options + /// (`consumer.blq_stale_threshold_seconds`) /// so they can be tuned without a restart. pub fn new( next_step: Next, @@ -173,74 +135,28 @@ where self.prev_flag_state = new_flag; let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); - match &mut self.state { - State::RoutingStale => produce_result, - State::Forwarding | State::Idle => next_step_result, - State::Flushing(commits) => { - let commits = commits.take(); - self.state = State::Forwarding; - Ok(commits) - } + match &produce_result { + Ok(Some(_)) => produce_result, + _ => next_step_result, } } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if !Self::is_enabled(&self.consumer_group) { + if !self.blq_active { return self.next_step.submit(message); } - let msg_ts = message .timestamp() .expect("Expected kafka message to always have a timestamp, but there wasn't one"); let elapsed = Utc::now() - msg_ts; let stale_threshold = self.stale_threshold(); - let friction = self.static_friction().filter(|f| *f < stale_threshold); - let threshold = match (&self.state, friction) { - (State::RoutingStale, Some(f)) => stale_threshold - f, - _ => stale_threshold, - }; - let is_stale = elapsed > threshold; - match (is_stale, &self.state) { - (true, State::Forwarding) => { - // When we transition from Forwarding to RoutingStale, there may be - // state in memory held downstream. We crash the consumer to get rid of internal state - // when it restarts it will have no internal state (State::Empty) and the first message in - // the topic will be stale. - panic!("Resetting consumer state to begin processing the stale backlog") - } - (true, State::Idle) | (true, State::RoutingStale) => { - // route the stale message to the BLQ - if self.state == State::Idle { - self.state = State::RoutingStale; - } - self.producer.submit(message) - } - (false, State::Idle) | (false, State::Forwarding) => { - // Forward the fresh message along to the next step - if self.state == State::Idle { - self.state = State::Forwarding; - } - self.next_step.submit(message) - } - (false, State::RoutingStale) => { - // We hit a fresh message, so we are done routing the backlog. - // Finish producing and committing all the state messages and - // then switch back to forwarding fresh. - - // i know i shouldnt be blocking in submit but there was no better way to do it - // the pipeline cant make progress until this completes anyways so it should be fine - let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap(); - self.state = State::Flushing(flush_results); - Err(SubmitError::MessageRejected( - sentry_arroyo::processing::strategies::MessageRejected { message }, - )) - } - (true, State::Flushing(_)) | (false, State::Flushing(_)) => { - Err(SubmitError::MessageRejected( - sentry_arroyo::processing::strategies::MessageRejected { message }, - )) - } + let is_stale = elapsed > stale_threshold; + if !is_stale { + self.blq_active = false; + return self.next_step.submit(message); + } else { + self.producer.submit(message) } } @@ -252,10 +168,10 @@ where fn join(&mut self, timeout: Option) -> Result, StrategyError> { let producer_result = self.producer.join(timeout); let next_step_result = self.next_step.join(timeout); - match &self.state { - State::RoutingStale => producer_result, - State::Forwarding | State::Idle => next_step_result, - State::Flushing(commits) => Ok(commits.clone()), + if self.blq_active { + producer_result + } else { + next_step_result } } } From 7da3e8156108526347f722831769804479947424 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 20 May 2026 15:15:22 -0700 Subject: [PATCH 6/8] new logic tested --- rust_snuba/src/strategies/blq_router.rs | 199 +++++++----------------- 1 file changed, 57 insertions(+), 142 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index af8de2471d..85d7006dd7 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -27,6 +27,10 @@ pub struct BLQRouter { // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well _concurrency: Option, consumer_group: String, + + // Invoked from poll() when the flag flips on at runtime. Defaults to + // std::process::exit(0) — overridden in tests so the assertion is observable. + exit_fn: Box, } impl BLQRouter> @@ -85,22 +89,6 @@ where .unwrap_or(TimeDelta::minutes(30)) } - /// Hysteresis applied while in RoutingStale: we keep routing messages at - /// least (stale_threshold - static_friction) old so we don't flip-flop at - /// the boundary. A value <= 0 disables friction. Defaults to 2 minutes. - fn static_friction(&self) -> Option { - let secs = options("snuba") - .ok() - .and_then(|o| o.get("consumer.blq_static_friction_seconds").ok()) - .and_then(|v| v.as_i64()) - .unwrap_or(120); - if secs > 0 { - Some(TimeDelta::seconds(secs)) - } else { - None - } - } - fn new_with_strategy( next_step: Next, blq_producer: ProduceStrategy, @@ -109,12 +97,12 @@ where let flag = Self::is_enabled(&consumer_group); Self { next_step, - state: State::Idle, prev_flag_state: flag, blq_active: flag, producer: blq_producer, _concurrency: None, consumer_group, + exit_fn: Box::new(|| std::process::exit(0)), } } } @@ -130,7 +118,8 @@ where tracing::info!( "consumer.blq_enabled flipped on at runtime; exiting consumer to flush downstream state" ); - std::process::exit(0); + (self.exit_fn)(); + return Ok(None); } self.prev_flag_state = new_flag; let produce_result = self.producer.poll(); @@ -182,8 +171,10 @@ mod tests { use chrono::DateTime; use sentry_arroyo::types::{Partition, Topic}; use sentry_options::init_with_schemas; - use sentry_options::testing::override_options; + use sentry_options::testing::{override_options, set_override}; use serde_json::json; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; use std::sync::Once; static INIT: Once = Once::new(); @@ -193,17 +184,11 @@ mod tests { struct MockStrategy { submitted: Vec>, - join_called: bool, - terminate_called: bool, } impl MockStrategy { fn new() -> Self { - Self { - submitted: vec![], - join_called: false, - terminate_called: false, - } + Self { submitted: vec![] } } } @@ -220,15 +205,12 @@ mod tests { Ok(()) } - fn terminate(&mut self) { - self.terminate_called = true; - } + fn terminate(&mut self) {} fn join( &mut self, _timeout: Option, ) -> Result, StrategyError> { - self.join_called = true; Ok(None) } } @@ -243,65 +225,49 @@ mod tests { } #[test] - #[should_panic(expected = "Resetting consumer state to begin processing the stale backlog")] - fn test_fresh_to_stale() { - /* - This tests that the BLQRouter forwards business-as-usual fresh messages through it - and crashes when it hits its first stale message - */ + fn test_shutoff_when_flag_flips() { + // Flag off: router forwards everything (fresh or stale) to next_step. + // When the flag flips on at runtime, the next poll() invokes exit_fn. init_config(); - let _guard = override_options(&[ - ( - "snuba", - "consumer.blq_enabled", - json!("test_consumer_group"), - ), - ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), - ("snuba", "consumer.blq_static_friction_seconds", json!(0)), - ]) - .unwrap(); + let _guard = + override_options(&[("snuba", "consumer.blq_stale_threshold_seconds", json!(10))]) + .unwrap(); + + let exited = Arc::new(AtomicBool::new(false)); + let exited_clone = exited.clone(); let mut router = BLQRouter::new_with_strategy( MockStrategy::new(), MockStrategy::new(), "test_consumer_group".to_string(), ); - // consuming messages as normal - for _ in 0..10 { - router.submit(make_message(Utc::now())).unwrap(); - _ = router.poll(); - } - assert_eq!(router.state, State::Forwarding); - // now theres a stale message, consumer should crash - _ = router.submit(make_message(Utc::now() - TimeDelta::seconds(20))); - } + router.exit_fn = Box::new(move || exited_clone.store(true, Ordering::SeqCst)); - fn submit_with_retry( - router: &mut BLQRouter, - message: Message, - max_retries: usize, - ) -> Result<(), SubmitError> { - let mut msg = message; - for _ in 0..max_retries { - match router.submit(msg) { - Ok(()) => return Ok(()), - Err(SubmitError::MessageRejected(rejected)) => { - _ = router.poll(); - msg = rejected.message; - } - Err(e) => return Err(e), - } - } - Err(SubmitError::MessageRejected( - sentry_arroyo::processing::strategies::MessageRejected { message: msg }, - )) + assert!(!router.blq_active); + + router.submit(make_message(Utc::now())).unwrap(); + router + .submit(make_message(Utc::now() - TimeDelta::minutes(1))) + .unwrap(); + _ = router.poll(); + assert_eq!(router.next_step.submitted.len(), 2); + assert_eq!(router.producer.submitted.len(), 0); + assert!(!exited.load(Ordering::SeqCst)); + + set_override( + "snuba", + "consumer.blq_enabled", + json!("test_consumer_group"), + ); + + _ = router.poll(); + assert!(exited.load(Ordering::SeqCst)); } #[test] - fn test_stale_to_fresh() { - /* - This tests that the BLQRouter properly routes stale messages to the BLQ - and then switches back to forwarding fresh messages once the backlog is burned - */ + fn test_blq_routes_stale_then_disables_on_fresh() { + // Flag on at boot: stale messages route to the producer; the first fresh + // message flips blq_active off; from then on, fresh AND subsequent stale + // both go to next_step (no re-entry without a process restart). init_config(); let _guard = override_options(&[ ( @@ -310,7 +276,6 @@ mod tests { json!("test_consumer_group"), ), ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), - ("snuba", "consumer.blq_static_friction_seconds", json!(0)), ]) .unwrap(); let mut router = BLQRouter::new_with_strategy( @@ -318,76 +283,26 @@ mod tests { MockStrategy::new(), "test_consumer_group".to_string(), ); - // backlog of 10 stale messages + assert!(router.blq_active); + for _ in 0..10 { router .submit(make_message(Utc::now() - TimeDelta::minutes(1))) .unwrap(); _ = router.poll(); } - assert_eq!(router.state, State::RoutingStale); - assert!(!router.producer.join_called); - // now we are back to fresh messages - for _ in 0..5 { - submit_with_retry(&mut router, make_message(Utc::now()), 3).unwrap(); - _ = router.poll(); - } - assert_eq!(router.state, State::Forwarding); - assert!(router.producer.join_called); assert_eq!(router.producer.submitted.len(), 10); - assert_eq!(router.next_step.submitted.len(), 5); - } + assert_eq!(router.next_step.submitted.len(), 0); + assert!(router.blq_active); - #[test] - fn test_passthrough_when_no_flag() { - // When the feature flag is not set, stale messages should pass through - // to next_step instead of being routed to BLQ - init_config(); - let mut router = BLQRouter::new_with_strategy( - MockStrategy::new(), - MockStrategy::new(), - "test_consumer_group".to_string(), - ); - - for _ in 0..5 { - router - .submit(make_message(Utc::now() - TimeDelta::minutes(1))) - .unwrap(); - _ = router.poll(); - } - - // All stale messages went to next_step, none to producer - assert_eq!(router.next_step.submitted.len(), 5); - assert_eq!(router.producer.submitted.len(), 0); - assert_eq!(router.state, State::Idle); - } + router.submit(make_message(Utc::now())).unwrap(); + assert!(!router.blq_active); - #[test] - fn test_passthrough_when_flag_disabled() { - // When the feature flag is explicitly false, stale messages should pass through - init_config(); - let _guard = override_options(&[ - ("snuba", "consumer.blq_enabled", json!("")), - ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), - ("snuba", "consumer.blq_static_friction_seconds", json!(0)), - ]) - .unwrap(); - let mut router = BLQRouter::new_with_strategy( - MockStrategy::new(), - MockStrategy::new(), - "test_consumer_group".to_string(), - ); - - for _ in 0..5 { - router - .submit(make_message(Utc::now() - TimeDelta::minutes(1))) - .unwrap(); - _ = router.poll(); - } - - // All stale messages went to next_step, none to producer - assert_eq!(router.next_step.submitted.len(), 5); - assert_eq!(router.producer.submitted.len(), 0); - assert_eq!(router.state, State::Idle); + router.submit(make_message(Utc::now())).unwrap(); + router + .submit(make_message(Utc::now() - TimeDelta::minutes(1))) + .unwrap(); + assert_eq!(router.producer.submitted.len(), 10); + assert_eq!(router.next_step.submitted.len(), 3); } } From 5855b280b7eb8077c623ddb48c2580863378ad68 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 20 May 2026 15:27:00 -0700 Subject: [PATCH 7/8] no drop errors --- rust_snuba/src/strategies/blq_router.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 85d7006dd7..3f3e8091c6 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -124,10 +124,9 @@ where self.prev_flag_state = new_flag; let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); - match &produce_result { - Ok(Some(_)) => produce_result, - _ => next_step_result, - } + let produce_commit = produce_result?; + let next_step_commit = next_step_result?; + Ok(produce_commit.or(next_step_commit)) } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { From 7a14ed5949dd9da2038d2adff4a31ce06fd9f3e4 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 21 May 2026 11:28:14 -0700 Subject: [PATCH 8/8] flag fix + linter --- rust_snuba/src/strategies/blq_router.rs | 12 ++++++------ sentry-options/schemas/snuba/schema.json | 9 ++------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 3f3e8091c6..4922f2b01e 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -70,9 +70,9 @@ where fn is_enabled(consumer_group: &str) -> bool { options("snuba") .ok() - .and_then(|o| o.get("consumer.blq_enabled").ok()) + .and_then(|o| o.get("consumer.blq_enabled_2").ok()) .and_then(|v| v.as_str().map(str::to_owned)) - .map(|s| s != "" && s == consumer_group) + .map(|s| !s.is_empty() && s == consumer_group) .unwrap_or(false) } @@ -116,7 +116,7 @@ where let new_flag = Self::is_enabled(&self.consumer_group); if !self.prev_flag_state && new_flag { tracing::info!( - "consumer.blq_enabled flipped on at runtime; exiting consumer to flush downstream state" + "consumer.blq_enabled_2 flipped on at runtime; exiting consumer to flush downstream state" ); (self.exit_fn)(); return Ok(None); @@ -142,7 +142,7 @@ where let is_stale = elapsed > stale_threshold; if !is_stale { self.blq_active = false; - return self.next_step.submit(message); + self.next_step.submit(message) } else { self.producer.submit(message) } @@ -254,7 +254,7 @@ mod tests { set_override( "snuba", - "consumer.blq_enabled", + "consumer.blq_enabled_2", json!("test_consumer_group"), ); @@ -271,7 +271,7 @@ mod tests { let _guard = override_options(&[ ( "snuba", - "consumer.blq_enabled", + "consumer.blq_enabled_2", json!("test_consumer_group"), ), ("snuba", "consumer.blq_stale_threshold_seconds", json!(10)), diff --git a/sentry-options/schemas/snuba/schema.json b/sentry-options/schemas/snuba/schema.json index ef9a18c609..daf5bb176d 100644 --- a/sentry-options/schemas/snuba/schema.json +++ b/sentry-options/schemas/snuba/schema.json @@ -12,20 +12,15 @@ "default": false, "description": "true to emit a tracing log each time a duplicate trace item is detected in the accepted-outcomes aggregator" }, - "consumer.blq_enabled": { + "consumer.blq_enabled_2": { "type": "string", "default": "", - "description": "enable backlog queue in snuba consumers" + "description": "enable backlog queue for the specified consumer group" }, "consumer.blq_stale_threshold_seconds": { "type": "integer", "default": 1800, "description": "BLQ stale threshold in seconds. Messages older than this are routed to the backlog queue." - }, - "consumer.blq_static_friction_seconds": { - "type": "integer", - "default": 120, - "description": "BLQ hysteresis in seconds. Once routing stale, keep routing messages at least (stale_threshold - static_friction) old. Set to 0 to disable friction." } } }