Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions v2/crates/wifi-densepose-sensing-server/src/csi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,16 @@ pub fn extract_features_from_frame(
.clamp(0.0, 1.0)
};

let variance_motion = (temporal_variance / 10.0).clamp(0.0, 1.0);
let mbp_motion = (motion_band_power / 25.0).clamp(0.0, 1.0);
// Normalize the energy-based motion terms by signal power (mean_amp^2) so
// they are independent of absolute amplitude. Without this, adding external
// antennas raised amplitudes ~5x and these raw energies ~30x, pinning both
// terms at the 1.0 clamp — which saturates raw_motion and defeats the
// adaptive baseline subtraction in smooth_and_classify (an empty room then
// reads "present"). Same sqrt-of-power-ratio form as temporal_motion_score
// above. Field-model (--calibrate) path is unaffected.
let amp_ref = mean_amp * mean_amp + 1e-9;
let variance_motion = (temporal_variance / amp_ref).sqrt().clamp(0.0, 1.0);
let mbp_motion = (motion_band_power / amp_ref).sqrt().clamp(0.0, 1.0);
let cp_motion = (change_points as f64 / 15.0).clamp(0.0, 1.0);
let motion_score = (temporal_motion_score * 0.4
+ variance_motion * 0.2
Expand Down
310 changes: 290 additions & 20 deletions v2/crates/wifi-densepose-sensing-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ struct NodeState {
/// Most recent novelty score in [0.0, 1.0] (0 = exact-match in bank,
/// 1 = no overlap). Consumed by the model-wake gate downstream.
pub(crate) last_novelty_score: Option<f32>,
/// CSI de-confounding (occupancy-blind fix): rolling P95 of per-frame spatial
/// CoV² ("frame richness"), used to exclude structureless/null frames.
frame_structure_p95: RollingP95,
/// Count of structureless/null frames excluded from the feature pipeline.
pub(crate) off_structure_frames: u64,
}

/// Default EMA alpha for temporal keypoint smoothing (RuVector Phase 2).
Expand All @@ -462,6 +467,44 @@ const NOVELTY_VECTOR_DIM: usize = 56;
/// ADR-084 Pass 3 — number of past sketches retained per-node for
/// novelty comparison. 64 frames ≈ 6.4 s at 10 Hz.
const NOVELTY_HISTORY_CAPACITY: usize = 64;

// ── CSI de-confounding (occupancy-blind fix) ─────────────────────────────────
// Promiscuous capture interleaves structureless "null" frames (near-constant
// amplitude vectors, spatial CoV² ≈ 0) with real CSI frames (CoV² ~0.4) at the
// SAME subcarrier width and RSSI. Diffing a null frame against a real one in
// `temporal_motion_score` fabricates large fake "motion", which is what makes the
// presence signal bimodal and occupancy-blind. The gate admits a frame into the
// feature pipeline only if its spatial CoV² is at least `DECONFOUND_FRACTION` of
// the node's rolling-P95 CoV² — adaptive, so there is no brittle absolute
// threshold and it self-calibrates per node. Disable with `SENSING_DECONFOUND=0`.
const DECONFOUND_FRACTION: f64 = 0.1;
const DECONFOUND_WINDOW: usize = 256;
const DECONFOUND_MIN_SAMPLES: usize = 16;

/// Scale-invariant spatial dispersion of one frame's amplitudes: `var / mean²`.
/// ≈ 0 for a structureless/null frame; large for a real multipath CSI frame.
fn frame_structure_cov2(amps: &[f64]) -> f64 {
if amps.is_empty() {
return 0.0;
}
let mean = amps.iter().sum::<f64>() / amps.len() as f64;
if mean.abs() < 1e-9 {
return 0.0;
}
let var = amps.iter().map(|a| (a - mean).powi(2)).sum::<f64>() / amps.len() as f64;
var / (mean * mean)
}

/// Whether CSI de-confounding is enabled (env `SENSING_DECONFOUND`, default on).
fn deconfound_enabled() -> bool {
use std::sync::OnceLock;
static ENABLED: OnceLock<bool> = OnceLock::new();
*ENABLED.get_or_init(|| {
std::env::var("SENSING_DECONFOUND")
.map(|v| v != "0" && !v.eq_ignore_ascii_case("false"))
.unwrap_or(true)
})
}
/// ADR-084 Pass 3 — feature-vector schema version. Bump on changes to
/// subcarrier ordering / normalisation so banks reject stale data.
const NOVELTY_SKETCH_VERSION: u16 = 1;
Expand Down Expand Up @@ -647,9 +690,28 @@ impl NodeState {
),
),
last_novelty_score: None,
frame_structure_p95: RollingP95::new(DECONFOUND_WINDOW, DECONFOUND_MIN_SAMPLES),
off_structure_frames: 0,
}
}

/// CSI de-confounding gate (occupancy-blind fix). Returns `false` for a
/// structureless/"null" frame — one whose spatial CoV² (`cov2`) is far below
/// the node's recent rich-frame level — so it is excluded from the feature
/// pipeline (diffing it against a real frame would fabricate temporal
/// "motion"). Admits all frames during warmup. Tracks `off_structure_frames`.
pub(crate) fn admit_frame_structure(&mut self, cov2: f64) -> bool {
self.frame_structure_p95.push(cov2);
let admit = match self.frame_structure_p95.current() {
Some(p95) => cov2 >= DECONFOUND_FRACTION * p95,
None => true, // warmup: not enough samples to know the rich level yet
};
if !admit {
self.off_structure_frames = self.off_structure_frames.saturating_add(1);
}
admit
}

/// ADR-084 cluster-Pi novelty step. Truncates / zero-pads the
/// incoming amplitude vector to `NOVELTY_VECTOR_DIM`, scores its
/// novelty against the per-node bank, then inserts it. The novelty
Expand Down Expand Up @@ -1378,15 +1440,25 @@ fn parse_esp32_frame(buf: &[u8]) -> Option<Esp32Frame> {
let n_antennas = buf[5];
let n_subcarriers = buf[6];
let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]);
let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]);
let rssi_raw = buf[14] as i8;
// Header offsets MUST match firmware csi_collector.c csi_serialize_frame():
// seq -> memcpy(&buf[12], &seq, 4) => [12..16)
// rssi -> buf[16]
// noise_floor -> buf[17]
// Previously these were read 2 bytes early ([10..14), buf[14], buf[15]), which
// sampled the high bytes of the sequence counter instead of rssi/noise_floor —
// the counter's byte 2 stays 0 for the first 65 536 frames, so rssi decoded as
// 0 dBm (an impossible WiFi level). That bogus rssi=0 then zeroed the RSSI-fusion
// weights and SNR-based signal_quality. The I/Q payload at byte 20 was already
// correct (CSI_HEADER_SIZE == 20), so amplitude-derived features were unaffected.
let sequence = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
let rssi_raw = buf[16] as i8;
// Fix RSSI sign: ensure it's always negative (dBm convention).
let rssi = if rssi_raw > 0 {
rssi_raw.saturating_neg()
} else {
rssi_raw
};
let noise_floor = buf[15] as i8;
let noise_floor = buf[17] as i8;

let iq_start = 20;
let n_pairs = n_antennas as usize * n_subcarriers as usize;
Expand Down Expand Up @@ -1824,8 +1896,20 @@ fn extract_features_from_frame(

// Blend temporal motion with variance-based motion for robustness.
// Also factor in motion_band_power and change_points for ESP32 real-world sensitivity.
let variance_motion = (temporal_variance / 10.0).clamp(0.0, 1.0);
let mbp_motion = (motion_band_power / 25.0).clamp(0.0, 1.0);
//
// Normalize the two energy-based terms by signal power (mean_amp^2) so they are
// dimensionless ratios independent of absolute amplitude — the same sqrt-of-power-ratio
// form already used by `temporal_motion_score` above. Without this, the fixed divisors
// (/10, /25) were calibrated for the antenna-less regime; adding external IPEX antennas
// raised amplitudes ~5x and these raw energies ~30x, pinning BOTH terms at the 1.0 clamp.
// A saturated term carries no information and defeats the adaptive baseline subtraction
// in `smooth_and_classify` — so an empty room read "present" indefinitely. Making them
// ratios self-scales to any node/antenna/room and to future mesh nodes. (change_points is
// already a dimensionless count, so it keeps its fixed divisor.) Field-model (--calibrate)
// path is unaffected — it never used these terms.
let amp_ref = mean_amp * mean_amp + 1e-9;
let variance_motion = (temporal_variance / amp_ref).sqrt().clamp(0.0, 1.0);
let mbp_motion = (motion_band_power / amp_ref).sqrt().clamp(0.0, 1.0);
let cp_motion = (change_points as f64 / 15.0).clamp(0.0, 1.0);
let motion_score = (temporal_motion_score * 0.4
+ variance_motion * 0.2
Expand Down Expand Up @@ -5231,8 +5315,32 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());

// Also maintain global frame_history for backward compat
// (simulation path, REST endpoints, etc.).
let node_id = frame.node_id;
// Clone adaptive model before the mutable borrow of node_states
// below (avoids an unsafe raw pointer — review finding #2).
let adaptive_model_clone = s.adaptive_model.clone();

// ── CSI de-confounding (occupancy-blind fix) ───────────────
// Exclude structureless/null frames so the feature pipeline
// (global + per-node) sees a consistent stream — a null frame
// diffed against a real one fabricates temporal "motion".
{
let cov2 = frame_structure_cov2(&frame.amplitudes);
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
// Advance the per-node fps EMA here (the gate is the single
// per-frame entry point); the per-node block no longer does.
ns.observe_csi_frame_arrival(std::time::Instant::now());
if deconfound_enabled() && !ns.admit_frame_structure(cov2) {
debug!(
"node {node_id}: excluding structureless frame \
(cov2={cov2:.4}) from feature pipeline (de-confounding)"
);
continue;
}
}

// Maintain global frame_history (admitted frames only) for the
// person-count fallback / REST endpoints / simulation path.
s.frame_history.push_back(frame.amplitudes.clone());
if s.frame_history.len() > FRAME_HISTORY_CAPACITY {
s.frame_history.pop_front();
Expand Down Expand Up @@ -5263,20 +5371,12 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
}

// ── Per-node processing (issue #249) ──────────────────
// Process entirely within per-node state so different
// ESP32 nodes never mix their smoothing/vitals buffers.
// We scope the mutable borrow of node_states so we can
// access other AppStateInner fields afterward.
let node_id = frame.node_id;
// Clone adaptive model before mutable borrow of node_states
// to avoid unsafe raw pointer (review finding #2).
let adaptive_model_clone = s.adaptive_model.clone();

// Process entirely within per-node state so different ESP32
// nodes never mix their smoothing/vitals buffers. `node_id` /
// `adaptive_model_clone` are bound above, and the per-node fps
// EMA was already advanced in the de-confounding gate; just
// re-borrow the node state here.
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
// ADR-110 iter 19 — feed the per-node fps EMA from real
// CSI arrivals. The helper sets `last_frame_time` as a
// side effect, so the previous bare assignment is gone.
ns.observe_csi_frame_arrival(std::time::Instant::now());

// ADR-084 Pass 3: cluster-Pi novelty sensor.
// Score this frame's feature vector against the per-node
Expand Down Expand Up @@ -7588,3 +7688,173 @@ mod export_rvf_mode_tests {
assert!(!export_emits_placeholder_demo(false, true, false));
}
}

/// Tests for the CSI de-confounding gate (occupancy-blind fix). Real promiscuous
/// capture interleaves structureless "null" frames with real CSI frames at the
/// same width/RSSI; diffing them fabricates temporal "motion". The gate excludes
/// null frames so the feature pipeline sees a consistent stream.
#[cfg(test)]
mod deconfound_tests {
use super::*;

fn flat(n: usize, level: f64) -> Vec<f64> {
vec![level; n]
}
fn rich(n: usize, seed: f64) -> Vec<f64> {
(0..n).map(|i| 20.0 + 15.0 * ((i as f64) * 0.4 + seed).sin()).collect()
}

#[test]
fn cov2_zero_for_flat_large_for_structured_and_scale_invariant() {
assert!(frame_structure_cov2(&flat(56, 20.0)) < 1e-9, "constant vector → no structure");
assert!(frame_structure_cov2(&rich(56, 0.0)) > 0.1, "multipath vector → real structure");
let a = rich(56, 0.7);
let a5: Vec<f64> = a.iter().map(|x| x * 5.0).collect();
assert!(
(frame_structure_cov2(&a) - frame_structure_cov2(&a5)).abs() < 1e-9,
"CoV² is scale-invariant (independent of amplitude gain)"
);
}

#[test]
fn gate_excludes_null_frames_after_warmup() {
let mut ns = NodeState::new();
// warm the P95 on rich frames so it knows the node's real level
for j in 0..DECONFOUND_MIN_SAMPLES + 4 {
ns.admit_frame_structure(frame_structure_cov2(&rich(56, j as f64)));
}
assert!(ns.admit_frame_structure(frame_structure_cov2(&rich(56, 99.0))), "rich admitted");
let before = ns.off_structure_frames;
assert!(!ns.admit_frame_structure(frame_structure_cov2(&flat(56, 20.0))), "null excluded");
assert_eq!(ns.off_structure_frames, before + 1, "exclusion is counted");
}

#[test]
fn gating_removes_fabricated_temporal_motion() {
// frame-to-frame normalized diff energy (firmware temporal_motion_score form)
let motion = |frames: &[Vec<f64>]| -> f64 {
let (mut acc, mut k) = (0.0, 0usize);
for w in frames.windows(2) {
let (a, b) = (&w[0], &w[1]);
let m: f64 = a.iter().sum::<f64>() / a.len() as f64;
if m == 0.0 { continue; }
let de: f64 = a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum::<f64>() / a.len() as f64;
acc += (de / (m * m)).sqrt();
k += 1;
}
if k > 0 { acc / k as f64 } else { 0.0 }
};
// alternating null/rich stream = the real-data pathology
let stream: Vec<Vec<f64>> = (0..200)
.map(|i| if i % 2 == 0 { rich(56, (i / 2) as f64 * 0.01) } else { flat(56, 20.0) })
.collect();
let confounded = motion(&stream);

let mut ns = NodeState::new();
for j in 0..DECONFOUND_MIN_SAMPLES + 4 {
ns.admit_frame_structure(frame_structure_cov2(&rich(56, j as f64)));
}
let admitted: Vec<Vec<f64>> = stream
.iter()
.filter(|a| ns.admit_frame_structure(frame_structure_cov2(a)))
.cloned()
.collect();
let deconfounded = motion(&admitted);
assert!(
deconfounded < confounded / 10.0,
"de-confounding must collapse fabricated motion: confounded={confounded:.3} deconfounded={deconfounded:.3}"
);
}
}

/// Regression tests for the IPEX-antenna saturation bug: `motion_score` must be
/// invariant to absolute signal amplitude so that adding external antennas (which
/// raised amplitudes ~5x) does not peg the presence terms at their 1.0 clamp and
/// make an empty room read "present". See `extract_features_from_frame`.
#[cfg(test)]
mod motion_score_antenna_tests {
use super::*;
use std::collections::VecDeque;

/// Minimal Esp32Frame carrying the given amplitude vector.
fn frame_with(amps: Vec<f64>) -> Esp32Frame {
let n = amps.len() as u8;
Esp32Frame {
magic: 0,
node_id: 1,
n_antennas: 1,
n_subcarriers: n,
freq_mhz: 2437,
sequence: 0,
rssi: -40,
noise_floor: -90,
phases: vec![0.0; amps.len()],
amplitudes: amps,
}
}

/// Deterministic 52-subcarrier amplitude pattern. `scale` multiplies the whole
/// signal (antenna gain); `jitter` controls per-frame deviation (motion); `seed`
/// varies the jitter from frame to frame.
fn pattern(scale: f64, jitter: f64, seed: f64) -> Vec<f64> {
(0..52)
.map(|i| {
let base = 18.0 + 6.0 * ((i as f64) * 0.3).sin();
let wobble = jitter * ((i as f64) * 0.7 + seed).sin();
(base + wobble) * scale
})
.collect()
}

/// motion_score (5th tuple element) for `current` preceded by `history` frames.
fn motion_score_for(history: &[Vec<f64>], current: Vec<f64>) -> f64 {
let hist: VecDeque<Vec<f64>> = history.iter().cloned().collect();
let (_f, _c, _b, _sv, motion_score) =
extract_features_from_frame(&frame_with(current), &hist, 10.0);
motion_score
}

#[test]
fn motion_score_is_amplitude_scale_invariant() {
// The core fix: an identical temporal pattern at 1x vs 5x amplitude (the
// antenna-gain regime) must yield the SAME motion_score. Every term is now a
// dimensionless ratio (variance/amp^2, mbp/amp^2, temporal diff/amp^2) or a
// relative-threshold count (change_points, dominant_freq), so amplitude cancels.
for &jitter in &[0.0_f64, 1.5, 5.0] {
let hist_1x: Vec<Vec<f64>> =
(0..6).map(|s| pattern(1.0, jitter, s as f64)).collect();
let hist_5x: Vec<Vec<f64>> =
(0..6).map(|s| pattern(5.0, jitter, s as f64)).collect();
let s1 = motion_score_for(&hist_1x, pattern(1.0, jitter, 6.0));
let s5 = motion_score_for(&hist_5x, pattern(5.0, jitter, 6.0));
assert!(
(s1 - s5).abs() < 1e-9,
"motion_score must be amplitude-invariant (jitter={jitter}): 1x={s1}, 5x={s5}"
);
}
}

#[test]
fn high_amplitude_quiet_signal_does_not_saturate() {
// A near-stationary signal at antenna-level amplitude must not peg motion_score
// at 1.0 (the pre-fix bug), and must score clearly below a moving signal at the
// SAME amplitude — proving dynamic range to distinguish empty from occupied.
let scale = 5.0;
let quiet = motion_score_for(
&(0..6).map(|s| pattern(scale, 0.05, s as f64)).collect::<Vec<_>>(),
pattern(scale, 0.05, 6.0),
);
let moving = motion_score_for(
&(0..6).map(|s| pattern(scale, 8.0, (s * 13) as f64)).collect::<Vec<_>>(),
pattern(scale, 8.0, 91.0),
);
assert!(
quiet < 0.5,
"quiet high-amplitude signal must not saturate, got {quiet}"
);
assert!(
moving > quiet + 0.1,
"moving must score above quiet (range preserved): quiet={quiet}, moving={moving}"
);
}
}