Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions v2/crates/wifi-densepose-sensing-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6182,11 +6182,12 @@ async fn main() {
let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts));
match mcfg.validate() {
Ok(()) => {
let node_id = mcfg.client_id.clone();
// Template builder only: the real node_id is set per-node by the
// publisher from each snapshot's node_id.
let builder = mqtt::publisher::OwnedDiscoveryBuilder {
discovery_prefix: mcfg.discovery_prefix.clone(),
node_id: node_id.clone(),
node_friendly_name: Some("RuView".to_string()),
node_id: "template".to_string(),
node_friendly_name: None,
sw_version: env!("CARGO_PKG_VERSION").to_string(),
model: "RuView WiFi Sensing".to_string(),
via_device: None,
Expand All @@ -6200,37 +6201,47 @@ async fn main() {
let Ok(v) = serde_json::from_str::<serde_json::Value>(&json) else {
continue;
};
let cls = &v["classification"];
let vit = &v["vital_signs"];
let presence = cls["presence"].as_bool().unwrap_or(false);
let n_persons = v["persons"]
.as_array()
.map(|a| a.len() as u32)
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
.unwrap_or(0);
let motion = match cls["motion_level"].as_str() {
Some("none") | Some("still") | Some("idle") | Some("") => 0.0,
Some(_) => 1.0,
None => 0.0,
};
let snap = mqtt::state::VitalsSnapshot {
node_id: node_id.clone(),
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
presence,
motion,
presence_score: if presence {
cls["confidence"].as_f64().unwrap_or(1.0)
} else {
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
n_persons,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
..Default::default()
// Per-node fan-out: the sensing broadcast carries a
// `nodes` array (same shape as REST /api/v1/nodes).
// Emit one VitalsSnapshot per node, preserving ids.
let Some(arr) = v["nodes"].as_array() else {
continue;
};
let _ = vtx.send(snap);
let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
for node in arr {
// node_id may arrive as a JSON number (REST shape)
// or as a string; accept either, skip anything else.
let id = match &node["node_id"] {
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) if !s.is_empty() => s.clone(),
_ => continue,
};
let motion = match node["motion_level"].as_str() {
Some("absent") | Some("none") | Some("still")
| Some("idle") | Some("") | None => 0.0,
Some(_) => 1.0,
};
let presence = motion > 0.0
&& node["status"].as_str() == Some("active");
let snap = mqtt::state::VitalsSnapshot {
node_id: id,
timestamp_ms: ts_ms,
presence,
motion,
presence_score: if presence { 1.0 } else { 0.0 },
// person_count is u64 in the REST shape; clamp into u32.
n_persons: node["person_count"]
.as_u64()
.unwrap_or(0)
.min(u32::MAX as u64) as u32,
rssi_dbm: node["rssi_dbm"].as_f64(),
// Remaining fields (motion_energy, breathing/heart rate,
// vital_confidence) are not part of the per-node summary
// emitted on this broadcast; Default (None/0.0) is correct.
..Default::default()
};
Comment thread
mschabhuettl marked this conversation as resolved.
let _ = vtx.send(snap);
}
}
});
tracing::info!("MQTT publisher started -> {host}:{port}");
Expand Down
138 changes: 119 additions & 19 deletions v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
//! through the privacy filter, gate by [`RateLimiter`], encode via
//! [`StateEncoder`], publish.
//!
//! ## Per-node discovery
//!
//! Each inbound [`VitalsSnapshot`] carries its own `node_id`. The publisher
//! keeps a map of per-node discovery identities and registers a new HA
//! device the first time a node id is seen on the stream, so N physical
//! nodes surface as N Home-Assistant devices (one per room).
//!
//! ## Reconnect strategy
//!
//! `rumqttc::EventLoop` reconnects automatically with backoff. After a
Expand All @@ -27,6 +34,7 @@
//! we last refreshed needs them) and reset the rate limiter so the
//! first post-reconnect sample emits promptly.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -80,6 +88,26 @@ impl NodeAvailability {
}
}

/// Per-node runtime state owned by the publisher loop: the node's discovery
/// identity, its pre-computed availability topics, and an independent rate
/// limiter so one chatty room can't starve another's publish budget.
struct NodeEntry {
builder: OwnedDiscoveryBuilder,
avail: NodeAvailability,
rate_limiter: RateLimiter,
}

impl NodeEntry {
fn new(builder: OwnedDiscoveryBuilder, entities: &[EntityKind]) -> Self {
let avail = NodeAvailability::for_builder(&builder.as_borrowed(), entities);
Self {
builder,
avail,
rate_limiter: RateLimiter::new(),
}
}
}

/// Spawn the MQTT publisher background task. Returns the join handle so
/// the caller can `await` it on shutdown. Errors during connection are
/// retried internally by `rumqttc::EventLoop`.
Expand Down Expand Up @@ -117,6 +145,21 @@ impl OwnedDiscoveryBuilder {
via_device: self.via_device.as_deref(),
}
}

/// Clone this template for a concrete node id. The HA device identity and
/// every topic derive from `node_id`, so changing it yields a distinct
/// device. If `friendly` is `Some`, it overrides the device name;
/// otherwise `DiscoveryBuilder::device` falls back to "RuView node <id>".
fn for_node(&self, node_id: &str, friendly: Option<&str>) -> OwnedDiscoveryBuilder {
let mut b = self.clone();
b.node_id = node_id.to_string();
if let Some(name) = friendly {
b.node_friendly_name = Some(name.to_string());
} else {
b.node_friendly_name = None;
}
b
}
}

/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
Expand All @@ -129,22 +172,22 @@ async fn run(
let opts = build_mqtt_options(&cfg);
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);

let builder_borrowed = builder_owned.as_borrowed();
let entities = DiscoveryBuilder::enabled_entities(
cfg.privacy_mode,
cfg.publish_pose,
&[], // no_semantic — wire from cli::Args in P3.5
);

if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] initial discovery publish failed: {e}");
}
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] initial availability publish failed: {e}");
}

let mut rate_limiter = RateLimiter::new();
// One discovery identity + availability + rate limiter PER node id seen on
// the stream. Nodes are discovered lazily: the first snapshot from a new
// node id triggers its retained discovery + availability publish.
//
// NOTE: entries are never evicted — a node that goes silent keeps its HA
// device marked `online` (the broadcast carries no per-node "gone" signal).
// Bounded in practice by the deployment's physical node count. A future
// improvement could track last-seen per node and flip availability to
// `offline` after a timeout; intentionally out of scope for this fix.
let mut nodes: HashMap<String, NodeEntry> = HashMap::new();
Comment thread
mschabhuettl marked this conversation as resolved.
let mut last_heartbeat = Instant::now();
let mut last_refresh = Instant::now();
let start_instant = Instant::now();
Expand All @@ -155,7 +198,7 @@ async fn run(
prefix = %cfg.discovery_prefix,
entities = entities.len(),
privacy = cfg.privacy_mode,
"[mqtt] publisher started",
"[mqtt] publisher started (per-node discovery)",
);

loop {
Expand All @@ -169,7 +212,14 @@ async fn run(
Ok(_) => {}
Err(e) => {
error!("[mqtt] event loop error, will reconnect: {e}");
rate_limiter.reset();
for n in nodes.values_mut() {
n.rate_limiter.reset();
}
// Force re-publish of discovery for all known nodes on
// the next refresh tick (HA may have restarted).
last_refresh = Instant::now()
.checked_sub(Duration::from_secs(cfg.refresh_secs))
.unwrap_or_else(Instant::now);
// Brief backoff before next poll attempt.
tokio::time::sleep(Duration::from_millis(500)).await;
}
Expand All @@ -179,14 +229,20 @@ async fn run(
// Periodic heartbeat / discovery refresh.
_ = tokio::time::sleep(Duration::from_secs(1)) => {
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
for n in nodes.values() {
if let Err(e) = publish_availability(&client, &n.avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
}
last_heartbeat = Instant::now();
}
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] discovery refresh failed: {e}");
for n in nodes.values() {
if let Err(e) =
publish_all_discovery(&client, &n.builder.as_borrowed(), &entities).await
{
warn!("[mqtt] discovery refresh failed: {e}");
}
}
last_refresh = Instant::now();
}
Expand All @@ -197,15 +253,59 @@ async fn run(
match recv {
Ok(snap) => {
let elapsed = start_instant.elapsed();
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;

// Lazily register a brand-new node the first time its id is
// seen: publish retained discovery + availability once, then
// insert. `contains_key` guards a single insert; the publish
// borrows `client`/`entities` (not `nodes`), so there is no
// borrow conflict with the `get_mut` below.
if !nodes.contains_key(&snap.node_id) {
let nb = builder_owned.for_node(&snap.node_id, None);
let entry = NodeEntry::new(nb, &entities);
if let Err(e) =
publish_all_discovery(&client, &entry.builder.as_borrowed(), &entities).await
{
warn!(
"[mqtt] discovery publish failed for node {}: {e}",
snap.node_id
);
}
if let Err(e) =
publish_availability(&client, &entry.avail, "online").await
{
warn!(
"[mqtt] availability publish failed for node {}: {e}",
snap.node_id
);
}
info!("[mqtt] registered HA device for node {}", snap.node_id);
nodes.insert(snap.node_id.clone(), entry);
}

// Route the snapshot to its node's builder + rate limiter.
// Always present here (just inserted above if it was new).
if let Some(entry) = nodes.get_mut(&snap.node_id) {
Comment thread
mschabhuettl marked this conversation as resolved.
let b = entry.builder.as_borrowed();
publish_snapshot(
&client,
&b,
&snap,
&cfg,
&mut entry.rate_limiter,
elapsed,
)
.await;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
}
Err(broadcast::error::RecvError::Closed) => {
info!("[mqtt] broadcast channel closed, draining");
// Publish offline before exit.
let _ = publish_availability(&client, &avail, "offline").await;
// Publish offline for every known node before exit.
for n in nodes.values() {
let _ = publish_availability(&client, &n.avail, "offline").await;
}
let _ = client.disconnect().await;
return;
}
Expand Down