From 4bd0b24f0b67a3a7944a91a28e021163fa1c20a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20Schabh=C3=BCttl?= Date: Mon, 1 Jun 2026 16:03:22 +0200 Subject: [PATCH 1/4] fix(mqtt): publish one HA device per node id (refs #872) --- .../wifi-densepose-sensing-server/src/main.rs | 67 ++++----- .../src/mqtt/publisher.rs | 128 +++++++++++++++--- 2 files changed, 143 insertions(+), 52 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 7fd8f4ec5d..c3337cc533 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -6182,11 +6182,12 @@ async fn main() { let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts)); match mcfg.validate() { Ok(()) => { - let node_id = mcfg.client_id.clone(); + // Template builder only: the real node_id is set per-node by the + // publisher from each snapshot's node_id. let builder = mqtt::publisher::OwnedDiscoveryBuilder { discovery_prefix: mcfg.discovery_prefix.clone(), - node_id: node_id.clone(), - node_friendly_name: Some("RuView".to_string()), + node_id: "template".to_string(), + node_friendly_name: None, sw_version: env!("CARGO_PKG_VERSION").to_string(), model: "RuView WiFi Sensing".to_string(), via_device: None, @@ -6200,37 +6201,37 @@ async fn main() { let Ok(v) = serde_json::from_str::(&json) else { continue; }; - let cls = &v["classification"]; - let vit = &v["vital_signs"]; - let presence = cls["presence"].as_bool().unwrap_or(false); - let n_persons = v["persons"] - .as_array() - .map(|a| a.len() as u32) - .or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32)) - .unwrap_or(0); - let motion = match cls["motion_level"].as_str() { - Some("none") | Some("still") | Some("idle") | Some("") => 0.0, - Some(_) => 1.0, - None => 0.0, - }; - let snap = mqtt::state::VitalsSnapshot { - node_id: node_id.clone(), - timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64, - presence, - motion, - presence_score: if presence { - cls["confidence"].as_f64().unwrap_or(1.0) - } else { - 0.0 - }, - breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(), - heartrate_bpm: vit["heart_rate_bpm"].as_f64(), - n_persons, - rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(), - vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0), - ..Default::default() + // Per-node fan-out: the sensing broadcast carries a + // `nodes` array (same shape as REST /api/v1/nodes). + // Emit one VitalsSnapshot per node, preserving ids. + let Some(arr) = v["nodes"].as_array() else { + continue; }; - let _ = vtx.send(snap); + let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; + for node in arr { + let id = match node["node_id"].as_u64() { + Some(n) => n.to_string(), + None => continue, + }; + let motion = match node["motion_level"].as_str() { + Some("absent") | Some("none") | Some("still") + | Some("idle") | Some("") | None => 0.0, + Some(_) => 1.0, + }; + let presence = motion > 0.0 + && node["status"].as_str() == Some("active"); + let snap = mqtt::state::VitalsSnapshot { + node_id: id, + timestamp_ms: ts_ms, + presence, + motion, + presence_score: if presence { 1.0 } else { 0.0 }, + n_persons: node["person_count"].as_u64().unwrap_or(0) as u32, + rssi_dbm: node["rssi_dbm"].as_f64(), + ..Default::default() + }; + let _ = vtx.send(snap); + } } }); tracing::info!("MQTT publisher started -> {host}:{port}"); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs index 20b5fd16e9..f99da61595 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -19,6 +19,13 @@ //! through the privacy filter, gate by [`RateLimiter`], encode via //! [`StateEncoder`], publish. //! +//! ## Per-node discovery +//! +//! Each inbound [`VitalsSnapshot`] carries its own `node_id`. The publisher +//! keeps a map of per-node discovery identities and registers a new HA +//! device the first time a node id is seen on the stream, so N physical +//! nodes surface as N Home-Assistant devices (one per room). +//! //! ## Reconnect strategy //! //! `rumqttc::EventLoop` reconnects automatically with backoff. After a @@ -27,6 +34,7 @@ //! we last refreshed needs them) and reset the rate limiter so the //! first post-reconnect sample emits promptly. +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -80,6 +88,26 @@ impl NodeAvailability { } } +/// Per-node runtime state owned by the publisher loop: the node's discovery +/// identity, its pre-computed availability topics, and an independent rate +/// limiter so one chatty room can't starve another's publish budget. +struct NodeEntry { + builder: OwnedDiscoveryBuilder, + avail: NodeAvailability, + rate_limiter: RateLimiter, +} + +impl NodeEntry { + fn new(builder: OwnedDiscoveryBuilder, entities: &[EntityKind]) -> Self { + let avail = NodeAvailability::for_builder(&builder.as_borrowed(), entities); + Self { + builder, + avail, + rate_limiter: RateLimiter::new(), + } + } +} + /// Spawn the MQTT publisher background task. Returns the join handle so /// the caller can `await` it on shutdown. Errors during connection are /// retried internally by `rumqttc::EventLoop`. @@ -117,6 +145,21 @@ impl OwnedDiscoveryBuilder { via_device: self.via_device.as_deref(), } } + + /// Clone this template for a concrete node id. The HA device identity and + /// every topic derive from `node_id`, so changing it yields a distinct + /// device. If `friendly` is `Some`, it overrides the device name; + /// otherwise `DiscoveryBuilder::device` falls back to "RuView node ". + fn for_node(&self, node_id: &str, friendly: Option<&str>) -> OwnedDiscoveryBuilder { + let mut b = self.clone(); + b.node_id = node_id.to_string(); + if let Some(name) = friendly { + b.node_friendly_name = Some(name.to_string()); + } else { + b.node_friendly_name = None; + } + b + } } /// Core run loop. Pumps the broadcast channel + the MQTT event loop in @@ -129,22 +172,16 @@ async fn run( let opts = build_mqtt_options(&cfg); let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256); - let builder_borrowed = builder_owned.as_borrowed(); let entities = DiscoveryBuilder::enabled_entities( cfg.privacy_mode, cfg.publish_pose, &[], // no_semantic — wire from cli::Args in P3.5 ); - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] initial discovery publish failed: {e}"); - } - let avail = NodeAvailability::for_builder(&builder_borrowed, &entities); - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] initial availability publish failed: {e}"); - } - - let mut rate_limiter = RateLimiter::new(); + // One discovery identity + availability + rate limiter PER node id seen on + // the stream. Nodes are discovered lazily: the first snapshot from a new + // node id triggers its retained discovery + availability publish. + let mut nodes: HashMap = HashMap::new(); let mut last_heartbeat = Instant::now(); let mut last_refresh = Instant::now(); let start_instant = Instant::now(); @@ -155,7 +192,7 @@ async fn run( prefix = %cfg.discovery_prefix, entities = entities.len(), privacy = cfg.privacy_mode, - "[mqtt] publisher started", + "[mqtt] publisher started (per-node discovery)", ); loop { @@ -169,7 +206,14 @@ async fn run( Ok(_) => {} Err(e) => { error!("[mqtt] event loop error, will reconnect: {e}"); - rate_limiter.reset(); + for n in nodes.values_mut() { + n.rate_limiter.reset(); + } + // Force re-publish of discovery for all known nodes on + // the next refresh tick (HA may have restarted). + last_refresh = Instant::now() + .checked_sub(Duration::from_secs(cfg.refresh_secs)) + .unwrap_or_else(Instant::now); // Brief backoff before next poll attempt. tokio::time::sleep(Duration::from_millis(500)).await; } @@ -179,14 +223,20 @@ async fn run( // Periodic heartbeat / discovery refresh. _ = tokio::time::sleep(Duration::from_secs(1)) => { if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT { - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] heartbeat publish failed: {e}"); + for n in nodes.values() { + if let Err(e) = publish_availability(&client, &n.avail, "online").await { + warn!("[mqtt] heartbeat publish failed: {e}"); + } } last_heartbeat = Instant::now(); } if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) { - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] discovery refresh failed: {e}"); + for n in nodes.values() { + if let Err(e) = + publish_all_discovery(&client, &n.builder.as_borrowed(), &entities).await + { + warn!("[mqtt] discovery refresh failed: {e}"); + } } last_refresh = Instant::now(); } @@ -197,15 +247,55 @@ async fn run( match recv { Ok(snap) => { let elapsed = start_instant.elapsed(); - publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; + + // Lazily register a brand-new node: publish its retained + // discovery + availability exactly once, when first seen. + if !nodes.contains_key(&snap.node_id) { + let nb = builder_owned.for_node(&snap.node_id, None); + let borrowed = nb.as_borrowed(); + if let Err(e) = + publish_all_discovery(&client, &borrowed, &entities).await + { + warn!( + "[mqtt] discovery publish failed for node {}: {e}", + snap.node_id + ); + } + let entry = NodeEntry::new(nb, &entities); + if let Err(e) = + publish_availability(&client, &entry.avail, "online").await + { + warn!( + "[mqtt] availability publish failed for node {}: {e}", + snap.node_id + ); + } + info!("[mqtt] registered HA device for node {}", snap.node_id); + nodes.insert(snap.node_id.clone(), entry); + } + + if let Some(entry) = nodes.get_mut(&snap.node_id) { + let b = entry.builder.as_borrowed(); + publish_snapshot( + &client, + &b, + &snap, + &cfg, + &mut entry.rate_limiter, + elapsed, + ) + .await; + } } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("[mqtt] lagged behind broadcast by {n} messages — dropped"); } Err(broadcast::error::RecvError::Closed) => { info!("[mqtt] broadcast channel closed, draining"); - // Publish offline before exit. - let _ = publish_availability(&client, &avail, "offline").await; + // Publish offline for every known node before exit. + for n in nodes.values() { + let _ = publish_availability(&client, &n.avail, "offline").await; + } let _ = client.disconnect().await; return; } From 797a6b4e89f4a050c04b529ba0db2baf5fb8e1dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20Schabh=C3=BCttl?= Date: Mon, 1 Jun 2026 16:11:03 +0200 Subject: [PATCH 2/4] ci: add build check for mqtt feature --- .github/workflows/pr-check.yml | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 .github/workflows/pr-check.yml diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml new file mode 100644 index 0000000000..1e6ab10129 --- /dev/null +++ b/.github/workflows/pr-check.yml @@ -0,0 +1,33 @@ +name: PR build check (mqtt) + +# Baut den sensing-server mit aktiviertem mqtt-Feature, damit der +# Per-Node-Discovery-Patch (#899) verifiziert wird. + +on: + push: + branches: [ "fix/per-node-mqtt-discovery" ] + pull_request: + workflow_dispatch: # erlaubt manuelles Starten ueber den Actions-Tab + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo + uses: Swatinem/rust-cache@v2 + with: + workspaces: v2 + + - name: cargo check (mqtt feature) + working-directory: v2 + run: cargo check -p wifi-densepose-sensing-server --features mqtt + + - name: cargo build (mqtt feature) + working-directory: v2 + run: cargo build -p wifi-densepose-sensing-server --features mqtt From 3ca1043feca339101a72557cba7bb6eb2ce0a65d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20Schabh=C3=BCttl?= Date: Mon, 1 Jun 2026 16:16:37 +0200 Subject: [PATCH 3/4] ci: remove temporary build check --- .github/workflows/pr-check.yml | 33 --------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 .github/workflows/pr-check.yml diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml deleted file mode 100644 index 1e6ab10129..0000000000 --- a/.github/workflows/pr-check.yml +++ /dev/null @@ -1,33 +0,0 @@ -name: PR build check (mqtt) - -# Baut den sensing-server mit aktiviertem mqtt-Feature, damit der -# Per-Node-Discovery-Patch (#899) verifiziert wird. - -on: - push: - branches: [ "fix/per-node-mqtt-discovery" ] - pull_request: - workflow_dispatch: # erlaubt manuelles Starten ueber den Actions-Tab - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable - - - name: Cache cargo - uses: Swatinem/rust-cache@v2 - with: - workspaces: v2 - - - name: cargo check (mqtt feature) - working-directory: v2 - run: cargo check -p wifi-densepose-sensing-server --features mqtt - - - name: cargo build (mqtt feature) - working-directory: v2 - run: cargo build -p wifi-densepose-sensing-server --features mqtt From 0de296f8069f37d1d4f1995b4849d29cad8711bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20Schabh=C3=BCttl?= Date: Mon, 1 Jun 2026 16:26:35 +0200 Subject: [PATCH 4/4] refactor(mqtt): address review feedback on per-node fan-out - main.rs: parse node_id as number-or-string; clamp person_count to u32 - publisher.rs: document unbounded node-map (no eviction); tidy lazy register --- .../wifi-densepose-sensing-server/src/main.rs | 18 +++++++++++++---- .../src/mqtt/publisher.rs | 20 ++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index c3337cc533..b29d376ee2 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -6209,9 +6209,12 @@ async fn main() { }; let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; for node in arr { - let id = match node["node_id"].as_u64() { - Some(n) => n.to_string(), - None => continue, + // node_id may arrive as a JSON number (REST shape) + // or as a string; accept either, skip anything else. + let id = match &node["node_id"] { + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::String(s) if !s.is_empty() => s.clone(), + _ => continue, }; let motion = match node["motion_level"].as_str() { Some("absent") | Some("none") | Some("still") @@ -6226,8 +6229,15 @@ async fn main() { presence, motion, presence_score: if presence { 1.0 } else { 0.0 }, - n_persons: node["person_count"].as_u64().unwrap_or(0) as u32, + // person_count is u64 in the REST shape; clamp into u32. + n_persons: node["person_count"] + .as_u64() + .unwrap_or(0) + .min(u32::MAX as u64) as u32, rssi_dbm: node["rssi_dbm"].as_f64(), + // Remaining fields (motion_energy, breathing/heart rate, + // vital_confidence) are not part of the per-node summary + // emitted on this broadcast; Default (None/0.0) is correct. ..Default::default() }; let _ = vtx.send(snap); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs index f99da61595..319abe3a6d 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -181,6 +181,12 @@ async fn run( // One discovery identity + availability + rate limiter PER node id seen on // the stream. Nodes are discovered lazily: the first snapshot from a new // node id triggers its retained discovery + availability publish. + // + // NOTE: entries are never evicted — a node that goes silent keeps its HA + // device marked `online` (the broadcast carries no per-node "gone" signal). + // Bounded in practice by the deployment's physical node count. A future + // improvement could track last-seen per node and flip availability to + // `offline` after a timeout; intentionally out of scope for this fix. let mut nodes: HashMap = HashMap::new(); let mut last_heartbeat = Instant::now(); let mut last_refresh = Instant::now(); @@ -248,20 +254,22 @@ async fn run( Ok(snap) => { let elapsed = start_instant.elapsed(); - // Lazily register a brand-new node: publish its retained - // discovery + availability exactly once, when first seen. + // Lazily register a brand-new node the first time its id is + // seen: publish retained discovery + availability once, then + // insert. `contains_key` guards a single insert; the publish + // borrows `client`/`entities` (not `nodes`), so there is no + // borrow conflict with the `get_mut` below. if !nodes.contains_key(&snap.node_id) { let nb = builder_owned.for_node(&snap.node_id, None); - let borrowed = nb.as_borrowed(); + let entry = NodeEntry::new(nb, &entities); if let Err(e) = - publish_all_discovery(&client, &borrowed, &entities).await + publish_all_discovery(&client, &entry.builder.as_borrowed(), &entities).await { warn!( "[mqtt] discovery publish failed for node {}: {e}", snap.node_id ); } - let entry = NodeEntry::new(nb, &entities); if let Err(e) = publish_availability(&client, &entry.avail, "online").await { @@ -274,6 +282,8 @@ async fn run( nodes.insert(snap.node_id.clone(), entry); } + // Route the snapshot to its node's builder + rate limiter. + // Always present here (just inserted above if it was new). if let Some(entry) = nodes.get_mut(&snap.node_id) { let b = entry.builder.as_borrowed(); publish_snapshot(