Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 34 additions & 33 deletions v2/crates/wifi-densepose-sensing-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6182,11 +6182,12 @@ async fn main() {
let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts));
match mcfg.validate() {
Ok(()) => {
let node_id = mcfg.client_id.clone();
// Template builder only: the real node_id is set per-node by the
// publisher from each snapshot's node_id.
let builder = mqtt::publisher::OwnedDiscoveryBuilder {
discovery_prefix: mcfg.discovery_prefix.clone(),
node_id: node_id.clone(),
node_friendly_name: Some("RuView".to_string()),
node_id: "template".to_string(),
node_friendly_name: None,
sw_version: env!("CARGO_PKG_VERSION").to_string(),
model: "RuView WiFi Sensing".to_string(),
via_device: None,
Expand All @@ -6200,37 +6201,37 @@ async fn main() {
let Ok(v) = serde_json::from_str::<serde_json::Value>(&json) else {
continue;
};
let cls = &v["classification"];
let vit = &v["vital_signs"];
let presence = cls["presence"].as_bool().unwrap_or(false);
let n_persons = v["persons"]
.as_array()
.map(|a| a.len() as u32)
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
.unwrap_or(0);
let motion = match cls["motion_level"].as_str() {
Some("none") | Some("still") | Some("idle") | Some("") => 0.0,
Some(_) => 1.0,
None => 0.0,
};
let snap = mqtt::state::VitalsSnapshot {
node_id: node_id.clone(),
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
presence,
motion,
presence_score: if presence {
cls["confidence"].as_f64().unwrap_or(1.0)
} else {
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
n_persons,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
..Default::default()
// Per-node fan-out: the sensing broadcast carries a
// `nodes` array (same shape as REST /api/v1/nodes).
// Emit one VitalsSnapshot per node, preserving ids.
let Some(arr) = v["nodes"].as_array() else {
continue;
};
let _ = vtx.send(snap);
let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
for node in arr {
let id = match node["node_id"].as_u64() {
Some(n) => n.to_string(),
None => continue,
};
Comment thread
mschabhuettl marked this conversation as resolved.
Outdated
let motion = match node["motion_level"].as_str() {
Some("absent") | Some("none") | Some("still")
| Some("idle") | Some("") | None => 0.0,
Some(_) => 1.0,
};
let presence = motion > 0.0
&& node["status"].as_str() == Some("active");
let snap = mqtt::state::VitalsSnapshot {
node_id: id,
timestamp_ms: ts_ms,
presence,
motion,
presence_score: if presence { 1.0 } else { 0.0 },
n_persons: node["person_count"].as_u64().unwrap_or(0) as u32,
rssi_dbm: node["rssi_dbm"].as_f64(),
..Default::default()
};
Comment thread
mschabhuettl marked this conversation as resolved.
let _ = vtx.send(snap);
}
}
});
tracing::info!("MQTT publisher started -> {host}:{port}");
Expand Down
128 changes: 109 additions & 19 deletions v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
//! through the privacy filter, gate by [`RateLimiter`], encode via
//! [`StateEncoder`], publish.
//!
//! ## Per-node discovery
//!
//! Each inbound [`VitalsSnapshot`] carries its own `node_id`. The publisher
//! keeps a map of per-node discovery identities and registers a new HA
//! device the first time a node id is seen on the stream, so N physical
//! nodes surface as N Home-Assistant devices (one per room).
//!
//! ## Reconnect strategy
//!
//! `rumqttc::EventLoop` reconnects automatically with backoff. After a
Expand All @@ -27,6 +34,7 @@
//! we last refreshed needs them) and reset the rate limiter so the
//! first post-reconnect sample emits promptly.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -80,6 +88,26 @@ impl NodeAvailability {
}
}

/// Per-node runtime state owned by the publisher loop: the node's discovery
/// identity, its pre-computed availability topics, and an independent rate
/// limiter so one chatty room can't starve another's publish budget.
struct NodeEntry {
builder: OwnedDiscoveryBuilder,
avail: NodeAvailability,
rate_limiter: RateLimiter,
}

impl NodeEntry {
fn new(builder: OwnedDiscoveryBuilder, entities: &[EntityKind]) -> Self {
let avail = NodeAvailability::for_builder(&builder.as_borrowed(), entities);
Self {
builder,
avail,
rate_limiter: RateLimiter::new(),
}
}
}

/// Spawn the MQTT publisher background task. Returns the join handle so
/// the caller can `await` it on shutdown. Errors during connection are
/// retried internally by `rumqttc::EventLoop`.
Expand Down Expand Up @@ -117,6 +145,21 @@ impl OwnedDiscoveryBuilder {
via_device: self.via_device.as_deref(),
}
}

/// Clone this template for a concrete node id. The HA device identity and
/// every topic derive from `node_id`, so changing it yields a distinct
/// device. If `friendly` is `Some`, it overrides the device name;
/// otherwise `DiscoveryBuilder::device` falls back to "RuView node <id>".
fn for_node(&self, node_id: &str, friendly: Option<&str>) -> OwnedDiscoveryBuilder {
let mut b = self.clone();
b.node_id = node_id.to_string();
if let Some(name) = friendly {
b.node_friendly_name = Some(name.to_string());
} else {
b.node_friendly_name = None;
}
b
}
}

/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
Expand All @@ -129,22 +172,16 @@ async fn run(
let opts = build_mqtt_options(&cfg);
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);

let builder_borrowed = builder_owned.as_borrowed();
let entities = DiscoveryBuilder::enabled_entities(
cfg.privacy_mode,
cfg.publish_pose,
&[], // no_semantic — wire from cli::Args in P3.5
);

if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] initial discovery publish failed: {e}");
}
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] initial availability publish failed: {e}");
}

let mut rate_limiter = RateLimiter::new();
// One discovery identity + availability + rate limiter PER node id seen on
// the stream. Nodes are discovered lazily: the first snapshot from a new
// node id triggers its retained discovery + availability publish.
let mut nodes: HashMap<String, NodeEntry> = HashMap::new();
Comment thread
mschabhuettl marked this conversation as resolved.
let mut last_heartbeat = Instant::now();
let mut last_refresh = Instant::now();
let start_instant = Instant::now();
Expand All @@ -155,7 +192,7 @@ async fn run(
prefix = %cfg.discovery_prefix,
entities = entities.len(),
privacy = cfg.privacy_mode,
"[mqtt] publisher started",
"[mqtt] publisher started (per-node discovery)",
);

loop {
Expand All @@ -169,7 +206,14 @@ async fn run(
Ok(_) => {}
Err(e) => {
error!("[mqtt] event loop error, will reconnect: {e}");
rate_limiter.reset();
for n in nodes.values_mut() {
n.rate_limiter.reset();
}
// Force re-publish of discovery for all known nodes on
// the next refresh tick (HA may have restarted).
last_refresh = Instant::now()
.checked_sub(Duration::from_secs(cfg.refresh_secs))
.unwrap_or_else(Instant::now);
// Brief backoff before next poll attempt.
tokio::time::sleep(Duration::from_millis(500)).await;
}
Expand All @@ -179,14 +223,20 @@ async fn run(
// Periodic heartbeat / discovery refresh.
_ = tokio::time::sleep(Duration::from_secs(1)) => {
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
if let Err(e) = publish_availability(&client, &avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
for n in nodes.values() {
if let Err(e) = publish_availability(&client, &n.avail, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
}
last_heartbeat = Instant::now();
}
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
warn!("[mqtt] discovery refresh failed: {e}");
for n in nodes.values() {
if let Err(e) =
publish_all_discovery(&client, &n.builder.as_borrowed(), &entities).await
{
warn!("[mqtt] discovery refresh failed: {e}");
}
}
last_refresh = Instant::now();
}
Expand All @@ -197,15 +247,55 @@ async fn run(
match recv {
Ok(snap) => {
let elapsed = start_instant.elapsed();
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;

// Lazily register a brand-new node: publish its retained
// discovery + availability exactly once, when first seen.
if !nodes.contains_key(&snap.node_id) {
let nb = builder_owned.for_node(&snap.node_id, None);
let borrowed = nb.as_borrowed();
if let Err(e) =
publish_all_discovery(&client, &borrowed, &entities).await
{
warn!(
"[mqtt] discovery publish failed for node {}: {e}",
snap.node_id
);
}
let entry = NodeEntry::new(nb, &entities);
if let Err(e) =
publish_availability(&client, &entry.avail, "online").await
{
warn!(
"[mqtt] availability publish failed for node {}: {e}",
snap.node_id
);
}
info!("[mqtt] registered HA device for node {}", snap.node_id);
nodes.insert(snap.node_id.clone(), entry);
}

if let Some(entry) = nodes.get_mut(&snap.node_id) {
Comment thread
mschabhuettl marked this conversation as resolved.
let b = entry.builder.as_borrowed();
publish_snapshot(
&client,
&b,
&snap,
&cfg,
&mut entry.rate_limiter,
elapsed,
)
.await;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
}
Err(broadcast::error::RecvError::Closed) => {
info!("[mqtt] broadcast channel closed, draining");
// Publish offline before exit.
let _ = publish_availability(&client, &avail, "offline").await;
// Publish offline for every known node before exit.
for n in nodes.values() {
let _ = publish_availability(&client, &n.avail, "offline").await;
}
let _ = client.disconnect().await;
return;
}
Expand Down