From 42d085e45dc885c50eb693444bce1a79150c9f85 Mon Sep 17 00:00:00 2001 From: DenzelPenzel Date: Wed, 27 May 2026 21:23:35 +0100 Subject: [PATCH] statement-store: wire peer topology from network events --- substrate/client/network/src/event.rs | 12 + substrate/client/network/src/litep2p/mod.rs | 8 +- substrate/client/network/src/service.rs | 14 +- .../client/network/src/service/out_events.rs | 16 + substrate/client/network/statement/Cargo.toml | 2 +- substrate/client/network/statement/src/lib.rs | 184 ++++++- .../client/network/statement/src/v2dht/mod.rs | 19 + .../statement/src/v2dht/peers_topology.rs | 507 ++++++++++++++++++ 8 files changed, 745 insertions(+), 17 deletions(-) create mode 100644 substrate/client/network/statement/src/v2dht/mod.rs create mode 100644 substrate/client/network/statement/src/v2dht/peers_topology.rs diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index bb4fa921c9493..56cfee70c6fd5 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -22,6 +22,7 @@ use crate::types::ProtocolName; use bytes::Bytes; +use std::collections::HashSet; use sc_network_common::role::ObservedRole; use sc_network_types::{ @@ -81,6 +82,17 @@ pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), + /// The network discovered a peer through routing-table/discovery mechanisms. + PeerDiscovered(PeerId), + + /// The network identified a peer and learned the protocols it supports. + PeerIdentified { + /// Identified peer. + peer: PeerId, + /// Protocols reported by the peer. + supported_protocols: HashSet, + }, + /// Opened a substream with the given node with the given notifications protocol. /// /// The protocol is always one of the notification protocols that have been registered. diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 2fa51a9465cf2..6541d4e4f44be 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -816,7 +816,9 @@ impl NetworkBackend for Litep2pNetworkBac } Some(DiscoveryEvent::RoutingTableUpdate { peers }) => { for peer in peers { - self.peerstore_handle.add_known_peer(peer.into()); + let peer = peer.into(); + self.peerstore_handle.add_known_peer(peer); + self.event_streams.send(Event::PeerDiscovered(peer)); } } Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => { @@ -1119,6 +1121,10 @@ impl NetworkBackend for Litep2pNetworkBac } } Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => { + self.event_streams.send(Event::PeerIdentified { + peer: peer.into(), + supported_protocols: supported_protocols.iter().cloned().map(Into::into).collect(), + }); self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await; } Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => { diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 4e9140250d798..31434be4706b4 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -1617,10 +1617,20 @@ where addr.clone(), ); } - self.peer_store_handle.add_known_peer(peer_id.into()); + let peer = peer_id.into(); + self.peer_store_handle.add_known_peer(peer); + self.event_streams.send(Event::PeerIdentified { + peer, + supported_protocols: protocols + .into_iter() + .map(|protocol| ProtocolName::from(protocol.to_string())) + .collect(), + }); }, SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => { - self.peer_store_handle.add_known_peer(peer_id.into()); + let peer = peer_id.into(); + self.peer_store_handle.add_known_peer(peer); + self.event_streams.send(Event::PeerDiscovered(peer)); }, SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => { if let Some(metrics) = self.metrics.as_ref() { diff --git a/substrate/client/network/src/service/out_events.rs b/substrate/client/network/src/service/out_events.rs index 5893db8503d69..f3d78f4f2ca0b 100644 --- a/substrate/client/network/src/service/out_events.rs +++ b/substrate/client/network/src/service/out_events.rs @@ -294,6 +294,12 @@ impl Metrics { Event::Dht(_) => { self.events_total.with_label_values(&["dht", "sent", name]).inc(); }, + Event::PeerDiscovered(_) => { + self.events_total.with_label_values(&["peer-discovered", "sent", name]).inc(); + }, + Event::PeerIdentified { .. } => { + self.events_total.with_label_values(&["peer-identified", "sent", name]).inc(); + }, Event::NotificationStreamOpened { protocol, .. } => { format_label("notif-open-", protocol, |protocol_label| { self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); @@ -322,6 +328,16 @@ impl Metrics { Event::Dht(_) => { self.events_total.with_label_values(&["dht", "received", name]).inc(); }, + Event::PeerDiscovered(_) => { + self.events_total + .with_label_values(&["peer-discovered", "received", name]) + .inc(); + }, + Event::PeerIdentified { .. } => { + self.events_total + .with_label_values(&["peer-identified", "received", name]) + .inc(); + }, Event::NotificationStreamOpened { protocol, .. } => { format_label("notif-open-", protocol, |protocol_label| { self.events_total.with_label_values(&[protocol_label, "received", name]).inc(); diff --git a/substrate/client/network/statement/Cargo.toml b/substrate/client/network/statement/Cargo.toml index baee02bf7ead0..60053b28945da 100644 --- a/substrate/client/network/statement/Cargo.toml +++ b/substrate/client/network/statement/Cargo.toml @@ -33,6 +33,7 @@ sc-network-common = { workspace = true, default-features = true } sc-network-sync = { workspace = true, default-features = true } sc-network-types = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } +sp-crypto-hashing = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-statement-store = { workspace = true, default-features = true } tokio = { workspace = true } @@ -45,7 +46,6 @@ sc-statement-store = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } -sp-crypto-hashing = { workspace = true, default-features = true } sp-keyring = { workspace = true, default-features = true } substrate-test-runtime-client = { workspace = true } tempfile = { workspace = true } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index 7791e70081d33..73d1120232cdd 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -27,38 +27,42 @@ //! `Future` that processes statements. mod affinity; +pub mod v2dht; -use crate::config::*; +use crate::{ + config::*, + v2dht::peers_topology::{PeersTopology, PeersTopologyConfig}, +}; use affinity::AffinityFilter; use codec::{Compact, Decode, Encode, MaxEncodedLen}; use futures::{ channel::oneshot, - future::{pending, FusedFuture}, + future::{FusedFuture, pending}, prelude::*, stream::FuturesUnordered, }; use governor::{ + Quota, RateLimiter, clock::DefaultClock, state::{InMemoryState, NotKeyed}, - Quota, RateLimiter, }; use prometheus_endpoint::{ - exponential_buckets, register, Counter, Gauge, GaugeVec, Histogram, HistogramOpts, Opts, - PrometheusError, Registry, U64, + Counter, Gauge, GaugeVec, Histogram, HistogramOpts, Opts, PrometheusError, Registry, U64, + exponential_buckets, register, }; use rand::seq::IteratorRandom; use sc_network::{ + Event, NetworkBackend, NetworkEventStream, NetworkPeers, NetworkStateInfo, config::{NonReservedPeerMode, SetConfig}, error, multiaddr, peer_store::PeerStoreProvider, service::{ - traits::{NotificationEvent, NotificationService, ValidationResult}, NotificationMetrics, + traits::{NotificationEvent, NotificationService, ValidationResult}, }, types::ProtocolName, - utils::{interval, LruHashSet}, - NetworkBackend, NetworkEventStream, NetworkPeers, + utils::{LruHashSet, interval}, }; use sc_network_sync::{SyncEvent, SyncEventStream}; use sc_network_types::PeerId; @@ -67,7 +71,7 @@ use sp_statement_store::{ FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult, }; use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque, hash_map::Entry}, iter, num::{NonZeroU32, NonZeroUsize}, pin::Pin, @@ -367,7 +371,7 @@ impl StatementHandlerPrototype { /// Important: the statements handler is initially disabled and doesn't gossip statements. /// Gossiping is enabled when major syncing is done. pub fn build< - N: NetworkPeers + NetworkEventStream, + N: NetworkPeers + NetworkEventStream + NetworkStateInfo, S: SyncEventStream + sp_consensus::SyncOracle, >( self, @@ -432,6 +436,10 @@ impl StatementHandlerPrototype { ); } + let network_event_stream = network.event_stream("statement-handler-network"); + let peers_topology = + PeersTopology::new(network.local_peer_id(), true, PeersTopologyConfig::default()); + let handler = StatementHandler { protocol_name: self.protocol_name, notification_service: self.notification_service, @@ -443,6 +451,8 @@ impl StatementHandlerPrototype { network, sync, sync_event_stream: sync_event_stream.fuse(), + network_event_stream: network_event_stream.fuse(), + peers_topology, peers: HashMap::new(), statement_store, queue_sender, @@ -466,7 +476,7 @@ impl StatementHandlerPrototype { /// Handler for statements. Call [`StatementHandler::run`] to start the processing. pub struct StatementHandler< - N: NetworkPeers + NetworkEventStream, + N: NetworkPeers + NetworkEventStream + NetworkStateInfo, S: SyncEventStream + sp_consensus::SyncOracle, > { protocol_name: ProtocolName, @@ -486,6 +496,10 @@ pub struct StatementHandler< sync: S, /// Receiver for syncing-related events. sync_event_stream: stream::Fuse + Send>>>, + /// Receiver for network topology events. + network_event_stream: stream::Fuse + Send>>>, + /// Local view of statement-store peers known through network topology events. + peers_topology: PeersTopology, /// Notification service. notification_service: Box, // All connected peers @@ -686,7 +700,7 @@ impl Peer { impl StatementHandler where - N: NetworkPeers + NetworkEventStream, + N: NetworkPeers + NetworkEventStream + NetworkStateInfo, S: SyncEventStream + sp_consensus::SyncOracle, { /// Create a new `StatementHandler` for testing/benchmarking purposes. @@ -703,6 +717,8 @@ where queue_sender: async_channel::Sender<(Statement, oneshot::Sender)>, statements_per_second: NonZeroU32, ) -> Self { + let local_peer = network.local_peer_id(); + Self { protocol_name, notification_service, @@ -712,6 +728,10 @@ where network, sync, sync_event_stream, + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new(local_peer, true, PeersTopologyConfig::default()), peers, statement_store, queue_sender, @@ -765,6 +785,14 @@ where return; } } + network_event = self.network_event_stream.next() => { + if let Some(network_event) = network_event { + self.handle_network_event(network_event); + } else { + // Network event stream has seemingly closed. Closing as well. + return; + } + } event = self.notification_service.next_event().fuse() => { if let Some(event) = event { self.handle_notification_event(event).await @@ -796,6 +824,16 @@ where } } + fn handle_network_event(&mut self, event: Event) { + match event { + Event::PeerDiscovered(peer) => self.peers_topology.note_seen(peer), + Event::PeerIdentified { peer, supported_protocols } => self + .peers_topology + .note_identified(peer, supported_protocols.contains(&self.protocol_name)), + _ => {}, + } + } + /// Send a single chunk of statements to a peer. /// /// Encodes the chunk according to the peer's protocol version: @@ -1000,6 +1038,7 @@ where return; }; let is_light = peer_role.is_light(); + let dht_eligible = protocol_version == PeerProtocolVersion::V2 && !is_light; log::debug!( target: LOG_TARGET, "Peer {peer} connected with statement protocol {protocol_version:?}, role={peer_role:?}" @@ -1025,6 +1064,7 @@ where }, ); debug_assert!(_was_in.is_none()); + self.peers_topology.on_peer_connected(peer, dht_eligible); self.metrics.as_ref().map(|metrics| { if let Some(peer) = self.peers.get(&peer) { @@ -1039,6 +1079,7 @@ where } }, NotificationEvent::NotificationStreamClosed { peer } => { + self.peers_topology.on_peer_disconnected(peer); let removed_peer = self.peers.remove(&peer); debug_assert!(removed_peer.is_some()); @@ -1517,8 +1558,8 @@ mod tests { use super::*; use std::sync::{ - atomic::{AtomicBool, Ordering}, Mutex, + atomic::{AtomicBool, Ordering}, }; /// Default seed used for bloom filters in tests. @@ -1526,6 +1567,7 @@ mod tests { #[derive(Clone)] struct TestNetwork { + local_peer: PeerId, reported_peers: Arc>>, disconnected_peers: Arc>>, /// Role to return from `peer_role`. Default: `Full`. @@ -1537,6 +1579,7 @@ mod tests { impl TestNetwork { fn new() -> Self { Self { + local_peer: PeerId::random(), reported_peers: Arc::new(Mutex::new(Vec::new())), disconnected_peers: Arc::new(Mutex::new(Vec::new())), default_role: sc_network::ObservedRole::Full, @@ -1547,6 +1590,7 @@ mod tests { fn new_light() -> Self { Self { + local_peer: PeerId::random(), reported_peers: Arc::new(Mutex::new(Vec::new())), disconnected_peers: Arc::new(Mutex::new(Vec::new())), default_role: sc_network::ObservedRole::Light, @@ -1691,6 +1735,20 @@ mod tests { } } + impl NetworkStateInfo for TestNetwork { + fn external_addresses(&self) -> Vec { + Vec::new() + } + + fn listen_addresses(&self) -> Vec { + Vec::new() + } + + fn local_peer_id(&self) -> PeerId { + self.local_peer + } + } + impl NetworkEventStream for TestNetwork { fn event_stream( &self, @@ -1966,6 +2024,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers, statement_store: Arc::new(statement_store.clone()), queue_sender, @@ -1994,6 +2060,41 @@ mod tests { .collect() } + #[tokio::test] + async fn network_events_update_statement_peer_topology() { + let (mut handler, _statement_store, _network, _notification_service, _, _) = + build_handler(0); + let peer = PeerId::random(); + let topic = [1; 32]; + + handler.handle_network_event(Event::PeerDiscovered(peer)); + handler.handle_network_event(Event::PeerIdentified { + peer, + supported_protocols: std::iter::once(handler.protocol_name.clone()).collect(), + }); + + assert_eq!(handler.peers_topology.known_peers_count(), 1); + assert!(handler.peers_topology.closest_known(topic, 1).is_empty()); + + handler + .handle_notification_event(NotificationEvent::NotificationStreamOpened { + peer, + direction: sc_network::service::traits::Direction::Inbound, + handshake: vec![], + negotiated_fallback: None, + }) + .await; + + assert!(handler.peers_topology.is_connected(&peer)); + assert_eq!(handler.peers_topology.closest_known(topic, 1), vec![peer]); + + handler + .handle_notification_event(NotificationEvent::NotificationStreamClosed { peer }) + .await; + + assert!(!handler.peers_topology.is_connected(&peer)); + } + /// Simulate the network closing the substream for every disconnected /// peer, so the handler runs its per-peer cleanup. async fn dispatch_disconnects( @@ -2201,6 +2302,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers: HashMap::new(), statement_store: Arc::new(statement_store.clone()), queue_sender, @@ -2244,6 +2353,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers: HashMap::new(), statement_store: Arc::new(statement_store.clone()), queue_sender, @@ -3661,6 +3778,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers: HashMap::new(), statement_store: Arc::new(statement_store.clone()), queue_sender, @@ -4016,6 +4141,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers: HashMap::new(), statement_store: Arc::new(statement_store), queue_sender, @@ -4081,6 +4214,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers: HashMap::new(), statement_store: Arc::new(statement_store), queue_sender, @@ -4158,6 +4299,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + network.local_peer_id(), + true, + PeersTopologyConfig::default(), + ), peers, statement_store: Arc::new(statement_store), queue_sender, @@ -4243,6 +4392,7 @@ mod tests { |network: TestNetwork, dropped: bool| -> StatementHandler { let (sync, _) = TestSync::with_syncing(false); let (queue_sender, _) = async_channel::bounded(2); + let local_peer = network.local_peer_id(); let mut peers = HashMap::new(); peers.insert(PeerId::random(), make_peer()); StatementHandler { @@ -4258,6 +4408,14 @@ mod tests { sync_event_stream: (Box::pin(futures::stream::pending()) as Pin + Send>>) .fuse(), + network_event_stream: (Box::pin(futures::stream::pending()) + as Pin + Send>>) + .fuse(), + peers_topology: PeersTopology::new( + local_peer, + true, + PeersTopologyConfig::default(), + ), peers, statement_store: Arc::new(TestStatementStore::new()), queue_sender, diff --git a/substrate/client/network/statement/src/v2dht/mod.rs b/substrate/client/network/statement/src/v2dht/mod.rs new file mode 100644 index 0000000000000..06d37679f94a1 --- /dev/null +++ b/substrate/client/network/statement/src/v2dht/mod.rs @@ -0,0 +1,19 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +pub mod peers_topology; diff --git a/substrate/client/network/statement/src/v2dht/peers_topology.rs b/substrate/client/network/statement/src/v2dht/peers_topology.rs new file mode 100644 index 0000000000000..8e29727bdfa18 --- /dev/null +++ b/substrate/client/network/statement/src/v2dht/peers_topology.rs @@ -0,0 +1,507 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use sc_network_types::PeerId; +pub use sp_statement_store::Hash as TopicHash; +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + num::NonZeroUsize, +}; + +#[derive(Debug, Clone)] +pub struct PeersTopologyConfig { + /// Number of DHT-eligible statement peers responsible for storing a topic. + pub replication_factor: NonZeroUsize, + /// Maximum number of DHT forwarding targets returned for one topic. + pub gossip_target: NonZeroUsize, + /// Multiplier for per-topic candidate pools in `peers_for_topics`. + pub candidate_multiplier: NonZeroUsize, +} + +impl Default for PeersTopologyConfig { + fn default() -> Self { + Self { + replication_factor: NonZeroUsize::new(20).expect("20 is non-zero"), + gossip_target: NonZeroUsize::new(3).expect("3 is non-zero"), + candidate_multiplier: NonZeroUsize::new(3).expect("3 is non-zero"), + } + } +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +enum ProtocolSupport { + #[default] + Unknown, + Supported, + Unsupported, +} + +impl ProtocolSupport { + fn from_supported(supported: bool) -> Self { + if supported { Self::Supported } else { Self::Unsupported } + } +} + +#[derive(Debug, Clone, Default)] +struct PeerInfo { + statement_protocol: ProtocolSupport, + dht_eligible: bool, + connected: bool, +} + +impl PeerInfo { + fn is_dht_candidate(&self) -> bool { + self.statement_protocol == ProtocolSupport::Supported && self.dht_eligible + } + + fn is_connected_dht_candidate(&self) -> bool { + self.connected && self.is_dht_candidate() + } +} + +/// Pure, event-fed local view of statement-store peers. +/// +/// The topology is built from peers learned through network discovery, identify metadata and +/// statement notification connections. It computes XOR distances locally over that learned peer +/// set; it does not issue topic-specific Kademlia lookups. +#[derive(Debug, Clone)] +pub struct PeersTopology { + local_peer: PeerId, + local_dht_eligible: bool, + config: PeersTopologyConfig, + peers: HashMap, +} + +impl PeersTopology { + pub fn new(local_peer: PeerId, local_dht_eligible: bool, config: PeersTopologyConfig) -> Self { + Self { local_peer, local_dht_eligible, config, peers: HashMap::new() } + } + + /// Record that discovery or a routing-table source saw `peer`. + pub fn note_seen(&mut self, peer: PeerId) { + if peer != self.local_peer { + self.peer_info(peer); + } + } + + /// Record statement-protocol support from identify/protocol metadata. + pub fn note_identified(&mut self, peer: PeerId, supports_statement_protocol: bool) { + if peer == self.local_peer { + return; + } + + let peer_info = self.peer_info(peer); + peer_info.statement_protocol = ProtocolSupport::from_supported(supports_statement_protocol); + if !supports_statement_protocol { + peer_info.dht_eligible = false; + } + } + + /// Record whether `peer` may store and route statement DHT data. + pub fn note_dht_eligibility(&mut self, peer: PeerId, dht_eligible: bool) { + if peer == self.local_peer { + return; + } + + self.peer_info(peer).dht_eligible = dht_eligible; + } + + /// Record that the statement notification substream opened. + pub fn on_peer_connected(&mut self, peer: PeerId, dht_eligible: bool) { + if peer == self.local_peer { + return; + } + + let peer_info = self.peer_info(peer); + peer_info.statement_protocol = ProtocolSupport::Supported; + peer_info.dht_eligible = dht_eligible; + peer_info.connected = true; + } + + /// Record that the statement notification substream closed. + pub fn on_peer_disconnected(&mut self, peer: PeerId) { + if let Some(peer_info) = self.peers.get_mut(&peer) { + peer_info.connected = false; + } + } + + /// Remove all topology state for `peer`. + pub fn note_peer_removed(&mut self, peer: PeerId) { + self.peers.remove(&peer); + } + + /// Connected statement-protocol peers, returned in deterministic order. + pub fn connected_peers(&self) -> impl Iterator + '_ { + let mut peers = self + .peers + .iter() + .filter(|(_, peer_info)| peer_info.connected) + .map(|(peer, _)| *peer) + .collect::>(); + peers.sort(); + peers.into_iter() + } + + /// Returns whether `peer` currently has an open statement-protocol substream. + pub fn is_connected(&self, peer: &PeerId) -> bool { + self.peers.get(peer).is_some_and(|peer_info| peer_info.connected) + } + + /// Number of known remote peers, including non-DHT-eligible statement peers. + pub fn known_peers_count(&self) -> usize { + self.peers.len() + } + + /// Closest known DHT-eligible statement peers for `topic`. + /// + /// "Closest" is computed over the locally learned DHT-eligible statement peers, not by querying + /// the network for the true global closest peers. + pub fn closest_known(&self, topic: TopicHash, n: usize) -> Vec { + let mut peers = self.dht_candidates().collect::>(); + peers.sort_by(|a, b| cmp_distance_then_peer(topic, a, b)); + peers.truncate(n); + peers + } + + /// Returns whether the local node is one of the closest DHT storage replicas for `topic`. + pub fn is_dht_affine(&self, topic: TopicHash) -> bool { + if !self.local_dht_eligible { + return false; + } + + let mut candidates = self.dht_candidates().collect::>(); + candidates.push(self.local_peer); + candidates.sort_by(|a, b| cmp_distance_then_peer(topic, a, b)); + candidates + .into_iter() + .take(self.config.replication_factor.get()) + .any(|peer| peer == self.local_peer) + } + + /// Closest connected DHT-eligible statement peers for `topic`. + pub fn routing_targets(&self, topic: TopicHash) -> Vec { + if !self.local_dht_eligible { + return Vec::new(); + } + + let mut peers = self + .peers + .iter() + .filter(|(_, peer_info)| peer_info.is_connected_dht_candidate()) + .map(|(peer, _)| *peer) + .collect::>(); + + peers.sort_by(|a, b| cmp_distance_then_peer(topic, a, b)); + peers.truncate(self.config.gossip_target.get()); + peers + } + + /// Local-only explicit-affinity connection candidates for `topics`. + pub fn peers_for_topics(&self, topics: &[TopicHash]) -> Vec { + if topics.is_empty() { + return Vec::new(); + } + + let pool_size = + self.config.replication_factor.get() * self.config.candidate_multiplier.get(); + + let closest_pools = topics + .iter() + .map(|topic| self.closest_known(*topic, pool_size)) + .collect::>(); + + let mut uncovered = closest_pools + .iter() + .enumerate() + .filter_map(|(topic_idx, pool)| { + (!pool.iter().any(|peer| self.is_connected(peer))).then_some(topic_idx) + }) + .collect::>(); + + let pools = closest_pools + .into_iter() + .map(|pool| { + pool.into_iter() + .filter(|peer| *peer != self.local_peer && !self.is_connected(peer)) + .collect::>() + }) + .collect::>(); + + let mut selected = Vec::new(); + let limit = topics.len() * self.config.candidate_multiplier.get(); + + while !uncovered.is_empty() && selected.len() < limit { + let Some(best_peer) = self.best_candidate(topics, &pools, &uncovered, &selected) else { + break; + }; + + selected.push(best_peer); + uncovered.retain(|topic_idx| !pools[*topic_idx].contains(&best_peer)); + } + + selected + } + + fn peer_info(&mut self, peer: PeerId) -> &mut PeerInfo { + self.peers.entry(peer).or_default() + } + + fn dht_candidates(&self) -> impl Iterator + '_ { + self.peers + .iter() + .filter(|(_, peer_info)| peer_info.is_dht_candidate()) + .map(|(peer, _)| *peer) + } + + fn best_candidate( + &self, + topics: &[TopicHash], + pools: &[Vec], + uncovered: &HashSet, + selected: &[PeerId], + ) -> Option { + let mut candidates = pools + .iter() + .flat_map(|pool| pool.iter().copied()) + .filter(|peer| !selected.contains(peer)) + .collect::>() + .into_iter() + .filter_map(|peer| { + let mut covering_topics = uncovered + .iter() + .copied() + .filter(|topic_idx| pools[*topic_idx].contains(&peer)) + .collect::>(); + if covering_topics.is_empty() { + return None; + } + + covering_topics.sort_by(|a, b| { + distance_to(topics[*a], &peer).cmp(&distance_to(topics[*b], &peer)) + }); + let best_distance = distance_to(topics[covering_topics[0]], &peer); + Some((peer, covering_topics.len(), best_distance)) + }) + .collect::>(); + + candidates.sort_by(|(peer_a, score_a, distance_a), (peer_b, score_b, distance_b)| { + score_b + .cmp(score_a) + .then_with(|| distance_a.cmp(distance_b)) + .then_with(|| peer_a.cmp(peer_b)) + }); + candidates.first().map(|(peer, _, _)| *peer) + } +} + +fn cmp_distance_then_peer(topic: TopicHash, a: &PeerId, b: &PeerId) -> Ordering { + distance_to(topic, a).cmp(&distance_to(topic, b)).then_with(|| a.cmp(b)) +} + +fn distance_to(topic: TopicHash, peer: &PeerId) -> [u8; 32] { + xor_distance(topic, peer_key(peer)) +} + +fn peer_key(peer: &PeerId) -> [u8; 32] { + sp_crypto_hashing::blake2_256(&peer.to_bytes()) +} + +fn xor_distance(a: [u8; 32], b: [u8; 32]) -> [u8; 32] { + let mut distance = [0; 32]; + for ((distance, a), b) in distance.iter_mut().zip(a).zip(b) { + *distance = a ^ b; + } + distance +} + +#[cfg(test)] +mod tests { + use super::*; + use std::num::NonZeroUsize; + + fn config(replication_factor: usize, gossip_target: usize) -> PeersTopologyConfig { + PeersTopologyConfig { + replication_factor: NonZeroUsize::new(replication_factor).expect("non-zero"), + gossip_target: NonZeroUsize::new(gossip_target).expect("non-zero"), + candidate_multiplier: NonZeroUsize::new(3).expect("non-zero"), + } + } + + fn peer(seed: u8) -> PeerId { + let mut bytes = [seed; 34]; + bytes[0] = 0; + bytes[1] = 32; + PeerId::from_bytes(&bytes).expect("identity multihash peer id") + } + + fn topology(local_seed: u8) -> PeersTopology { + PeersTopology::new(peer(local_seed), true, config(2, 2)) + } + + fn topic(seed: u8) -> TopicHash { + [seed; 32] + } + + fn dht_peer(topology: &mut PeersTopology, peer: PeerId) { + topology.note_seen(peer); + topology.note_identified(peer, true); + topology.note_dht_eligibility(peer, true); + } + + fn known_dht_peers(topology: &PeersTopology, topic: TopicHash) -> Vec { + topology.closest_known(topic, topology.known_peers_count()) + } + + #[test] + fn new_topology_starts_empty() { + let topology = topology(1); + + assert_eq!(topology.known_peers_count(), 0); + assert_eq!(topology.connected_peers().count(), 0); + assert!(!topology.is_connected(&peer(2))); + } + + #[test] + fn lifecycle_mutators_are_idempotent_and_filter_dht_eligibility() { + let mut topology = topology(1); + let full = peer(2); + let light = peer(3); + + dht_peer(&mut topology, full); + dht_peer(&mut topology, full); + topology.note_seen(light); + topology.note_identified(light, true); + topology.note_dht_eligibility(light, false); + topology.on_peer_connected(full, true); + topology.on_peer_connected(light, false); + + assert_eq!(topology.known_peers_count(), 2); + assert!(topology.is_connected(&full)); + assert!(topology.is_connected(&light)); + assert_eq!(known_dht_peers(&topology, topic(9)), vec![full]); + + topology.note_identified(full, false); + assert!(known_dht_peers(&topology, topic(9)).is_empty()); + + topology.on_peer_disconnected(full); + assert!(!topology.is_connected(&full)); + assert_eq!(topology.known_peers_count(), 2); + + topology.note_peer_removed(light); + assert_eq!(topology.known_peers_count(), 1); + assert!(!topology.is_connected(&light)); + } + + #[test] + fn storage_affinity_uses_known_dht_peers_and_self() { + let mut topology = topology(10); + let peers = (11..=20).map(peer).collect::>(); + + for peer in &peers { + dht_peer(&mut topology, *peer); + } + + let topic = topic(42); + let mut responsible = known_dht_peers(&topology, topic); + responsible.push(peer(10)); + responsible.sort_by(|a, b| cmp_distance_then_peer(topic, a, b)); + let expected = responsible.into_iter().take(2).any(|p| p == peer(10)); + + assert_eq!(topology.is_dht_affine(topic), expected); + } + + #[test] + fn non_dht_local_node_is_never_affine_and_does_not_route() { + let mut topology = PeersTopology::new(peer(1), false, config(2, 2)); + let remote = peer(2); + + dht_peer(&mut topology, remote); + topology.on_peer_connected(remote, true); + + assert!(!topology.is_dht_affine(topic(1))); + assert!(topology.routing_targets(topic(1)).is_empty()); + } + + #[test] + fn routing_targets_are_closest_connected_dht_peers_even_when_farther_than_self() { + let mut topology = topology(1); + let topic = topic(7); + let self_distance = distance_to(topic, &peer(1)); + let mut farther = Vec::new(); + + for seed in 2..=80 { + let candidate = peer(seed); + if distance_to(topic, &candidate) > self_distance { + farther.push(candidate); + } + if farther.len() == 3 { + break; + } + } + assert!(farther.len() >= 3, "test peer fixture must include peers farther than self"); + + for peer in &farther { + dht_peer(&mut topology, *peer); + topology.on_peer_connected(*peer, true); + } + + let expected = { + let mut peers = farther.clone(); + peers.sort_by(|a, b| cmp_distance_then_peer(topic, a, b)); + peers.truncate(2); + peers + }; + + assert_eq!(topology.routing_targets(topic), expected); + } + + #[test] + fn peers_for_topics_does_not_select_for_topics_covered_by_connected_peers() { + let mut topology = topology(1); + let topic = topic(9); + + for peer in (2..=10).map(peer) { + dht_peer(&mut topology, peer); + } + + let connected = topology.closest_known(topic, 1)[0]; + topology.on_peer_connected(connected, true); + + assert!(topology.peers_for_topics(&[topic]).is_empty()); + } + + #[test] + fn peers_for_topics_is_deterministic_local_set_cover() { + let mut topology = topology(1); + let peers = (2..=30).map(peer).collect::>(); + let topics = [topic(1), topic(9), topic(17)]; + + for peer in &peers { + dht_peer(&mut topology, *peer); + } + + let first = topology.peers_for_topics(&topics); + let second = topology.peers_for_topics(&topics); + + assert_eq!(first, second); + assert!(!first.is_empty()); + assert!(first.len() <= topics.len() * 3); + assert!(first.iter().all(|peer| peers.contains(peer))); + } +}