From 27d7b18004a1cdc9ad944e436369f69bc11be0bc Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Wed, 17 Jun 2026 16:23:25 -0700 Subject: [PATCH] feat(spice): non-tracking validators pull witness for fallback --- .../src/spice/data_distributor_actor.rs | 75 ++++++- .../primitives/src/spice/chunk_endorsement.rs | 8 +- test-loop-tests/src/setup/drop_condition.rs | 17 ++ .../src/tests/spice/all_stake_fallback.rs | 188 ++++++++++++++++++ test-loop-tests/src/tests/spice/mod.rs | 1 + test-loop-tests/src/utils/network.rs | 26 +++ test-loop-tests/src/utils/node.rs | 7 + 7 files changed, 320 insertions(+), 2 deletions(-) create mode 100644 test-loop-tests/src/tests/spice/all_stake_fallback.rs diff --git a/chain/client/src/spice/data_distributor_actor.rs b/chain/client/src/spice/data_distributor_actor.rs index b56ca113712..a51e13539c4 100644 --- a/chain/client/src/spice/data_distributor_actor.rs +++ b/chain/client/src/spice/data_distributor_actor.rs @@ -17,7 +17,7 @@ use near_async::messaging::Handler; use near_async::messaging::Sender; use near_async::time::Duration; use near_chain::Block; -use near_chain::spice::core::SpiceCoreReader; +use near_chain::spice::core::{SpiceCoreReader, fallback_eligible}; use near_chain::spice::core_writer_actor::ProcessedBlock; use near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME; use near_chain_configs::MutableValidatorSigner; @@ -329,6 +329,9 @@ impl Handler for SpiceDataDistributorActor { impl Handler for SpiceDataDistributorActor { fn handle(&mut self, ProcessedBlock { block_hash }: ProcessedBlock) { + if let Err(err) = self.contribute_fallback_endorsements(&block_hash) { + tracing::error!(target: "spice_data_distribution", ?err, "failed contributing fallback endorsements"); + } if let Err(err) = self.start_waiting_on_data(&block_hash) { tracing::error!(target: "spice_data_distribution", ?err, ?block_hash, "failure when starting waiting on data"); } @@ -838,6 +841,76 @@ impl SpiceDataDistributorActor { // TODO(spice): Implement a state machine to track all the data we produce or may need. This // would help make sure that we cannot have and request data at the same time. + /// As a non-designated, non-tracking epoch validator, certify overdue chunks via the all-stake + /// fallback by pulling each chunk's witness so we can validate and endorse it. Re-evaluated per + /// block; once we've endorsed a chunk there's nothing more to do. + fn contribute_fallback_endorsements(&mut self, block_hash: &CryptoHash) -> Result<(), Error> { + let Some(signer) = self.validator_signer.get() else { + return Ok(()); + }; + let me = signer.validator_id(); + let block = self.chain_store.get_block(block_hash)?; + let carrying_height = block.header().height() + 1; + + for chunk_info in self.core_reader.get_uncertified_chunks(block_hash)? { + let chunk_id = &chunk_info.chunk_id; + if !fallback_eligible(&self.chain_store, carrying_height, block_hash, chunk_id)? { + continue; + } + let chunk_block = self.chain_store.get_block(&chunk_id.block_hash)?; + let epoch_id = chunk_block.header().epoch_id(); + if self.epoch_manager.get_validator_by_account_id(epoch_id, me).is_err() { + continue; + } + let assignments = self.epoch_manager.get_chunk_validator_assignments( + epoch_id, + chunk_id.shard_id, + chunk_block.header().height(), + )?; + if assignments.contains(me) { + continue; + } + // We already endorsed (the witness-validation flow broadcasts once eligible). + if self.core_reader.endorsement_exists(&chunk_id.block_hash, chunk_id.shard_id, me) { + continue; + } + // A non-tracker has no result to endorse; pull the witness so it can produce one. + let tracks_shard = self.shard_tracker.should_apply_chunk( + ApplyChunksMode::IsCaughtUp, + chunk_block.header().prev_hash(), + chunk_id.shard_id, + ); + if !tracks_shard { + self.start_waiting_on_fallback_witness(chunk_id, &chunk_block, me)?; + } + } + Ok(()) + } + + /// Pull a chunk's witness (not received by non-designated validators in the initial + /// distribution) so we can apply the chunk and endorse it for the fallback. + fn start_waiting_on_fallback_witness( + &mut self, + chunk_id: &SpiceChunkId, + chunk_block: &Block, + me: &AccountId, + ) -> Result<(), Error> { + let id = SpiceDataIdentifier::Witness { + block_hash: chunk_id.block_hash, + shard_id: chunk_id.shard_id, + }; + let (_recipients, producers) = self.recipients_and_producers(&id, chunk_block)?; + if producers.contains(me) + || self.waiting_on_data.contains_key(&id) + || self.recently_decoded_data.contains(&id) + || self.is_data_known(me, chunk_block, &id) + { + return Ok(()); + } + self.waiting_on_data.insert(id, HashMap::new()); + Ok(()) + } + fn start_waiting_on_data(&mut self, block_hash: &CryptoHash) -> Result<(), Error> { let signer = self.validator_signer.get(); let me = signer.as_ref().map(|signer| signer.validator_id()); diff --git a/core/primitives/src/spice/chunk_endorsement.rs b/core/primitives/src/spice/chunk_endorsement.rs index 6640de8a1af..ed6a2de68f4 100644 --- a/core/primitives/src/spice/chunk_endorsement.rs +++ b/core/primitives/src/spice/chunk_endorsement.rs @@ -6,7 +6,7 @@ use crate::validator_signer::ValidatorSigner; use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::{PublicKey, Signature}; use near_primitives_core::hash::CryptoHash; -use near_primitives_core::types::AccountId; +use near_primitives_core::types::{AccountId, ShardId}; use near_schema_checker_lib::ProtocolSchema; use std::fmt::Debug; @@ -53,6 +53,12 @@ impl SpiceChunkEndorsement { } } + pub fn shard_id(&self) -> ShardId { + match self { + Self::V1(v1) => v1.chunk_id.shard_id, + } + } + /// Checks signatures are returns SpiceVerifiedEndorsement if it's correct. /// NOTE: that it doesn't do any additional validations apart from checking signature. pub fn into_verified(self, public_key: &PublicKey) -> Option { diff --git a/test-loop-tests/src/setup/drop_condition.rs b/test-loop-tests/src/setup/drop_condition.rs index fa53e12d5f2..8f36fa730c8 100644 --- a/test-loop-tests/src/setup/drop_condition.rs +++ b/test-loop-tests/src/setup/drop_condition.rs @@ -2,6 +2,7 @@ use super::peer_manager_actor::NetworkRequestHandler; use super::state::NodeExecutionData; use crate::utils::network::{ block_dropper_by_height, chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash, + spice_designated_endorsement_dropper, }; use near_async::messaging::{CanSend, LateBoundSender}; use near_async::test_loop::data::TestLoopData; @@ -34,6 +35,10 @@ pub enum DropCondition { ChunksProducedByHeight(HashMap>), // Drops Block broadcast messages with height in `self.0` BlocksByHeight(HashSet), + /// Drops every SPICE chunk endorsement whose sender is designated for the endorsed chunk, + /// disabling the designated certification path so chunks can certify only via the all-stake + /// fallback. + DesignatedSpiceEndorsements, } /// Stores all chunks ever observed on chain. Determines if a chunk can be @@ -126,9 +131,21 @@ impl NodeExecutionData { DropCondition::BlocksByHeight(heights) => { self.register_drop_blocks_by_height(test_loop_data, heights); } + DropCondition::DesignatedSpiceEndorsements => { + self.register_drop_designated_spice_endorsements(test_loop_data); + } } } + fn register_drop_designated_spice_endorsements(&self, test_loop_data: &mut TestLoopData) { + let client_actor = test_loop_data.get(&self.client_sender.actor_handle()); + let epoch_manager = client_actor.client.chain.epoch_manager.clone(); + self.register_override_handler( + test_loop_data, + spice_designated_endorsement_dropper(epoch_manager), + ); + } + fn register_drop_chunks_validated_by( &self, test_loop_data: &mut TestLoopData, diff --git a/test-loop-tests/src/tests/spice/all_stake_fallback.rs b/test-loop-tests/src/tests/spice/all_stake_fallback.rs new file mode 100644 index 00000000000..5d04ac58114 --- /dev/null +++ b/test-loop-tests/src/tests/spice/all_stake_fallback.rs @@ -0,0 +1,188 @@ +use crate::setup::builder::TestLoopBuilder; +use crate::setup::drop_condition::DropCondition; +use crate::utils::node::TestLoopNode; +use itertools::Itertools; +use near_async::time::Duration; +use near_chain::spice::core::all_stake_fallback_assignment; +use near_chain_configs::Genesis; +use near_chain_configs::test_genesis::{TestEpochConfigBuilder, ValidatorsSpec}; +use near_o11y::testonly::init_test_logger; +use near_primitives::block::BlockHeader; +use near_primitives::epoch_manager::EpochConfigStore; +use near_primitives::shard_layout::ShardLayout; +use near_primitives::test_utils::create_test_signer; +use near_primitives::types::{AccountId, AccountInfo, Balance, BlockHeightDelta}; +use std::collections::HashSet; + +/// Genesis, epoch config, and client accounts for the fallback tests: 14 equal-stake validators +/// over 6 shards with 2 mandates each, keeping each chunk's designated subset under 1/3 of stake. +#[derive(Default)] +struct FallbackSetup { + epoch_length: Option, +} + +impl FallbackSetup { + fn new() -> Self { + Self::default() + } + + fn epoch_length(mut self, epoch_length: BlockHeightDelta) -> Self { + self.epoch_length = Some(epoch_length); + self + } + + fn build(self) -> (Vec, Genesis, EpochConfigStore) { + let num_producers = 4; + let num_validators = 14; + let accounts: Vec = + (0..num_validators).map(|i| format!("validator{i}").parse().unwrap()).collect_vec(); + let all_validators: Vec = accounts + .iter() + .map(|account_id| AccountInfo { + public_key: create_test_signer(account_id.as_str()).public_key(), + account_id: account_id.clone(), + amount: Balance::from_near(100), + }) + .collect(); + let validators_spec = + ValidatorsSpec::raw(all_validators, num_producers, num_producers, num_validators); + + let mut genesis_builder = TestLoopBuilder::new_genesis_builder() + .shard_layout(ShardLayout::multi_shard(6, 0)) + .validators_spec(validators_spec); + if let Some(epoch_length) = self.epoch_length { + genesis_builder = genesis_builder.epoch_length(epoch_length); + } + let genesis = genesis_builder.build(); + let epoch_config_store = TestEpochConfigBuilder::from_genesis(&genesis) + .target_validator_mandates_per_shard(2) + .build_store_for_genesis_protocol_version(); + (accounts, genesis, epoch_config_store) + } +} + +/// Asserts every (height, shard) of the genesis epoch has designated validators under 1/3 of total +/// stake, so the fallback's non-designated remainder can reach 2/3. Genesis epoch is representative. +fn assert_fallback_has_enough_stake(node: &TestLoopNode) { + let epoch_manager = node.client().epoch_manager.clone(); + let epoch_id = node.head().epoch_id; + let epoch_length = epoch_manager.get_epoch_config(&epoch_id).unwrap().epoch_length; + let shard_ids: Vec<_> = + epoch_manager.get_shard_layout(&epoch_id).unwrap().shard_ids().collect(); + let total_stake: u128 = all_stake_fallback_assignment(epoch_manager.as_ref(), &epoch_id) + .unwrap() + .assignments() + .iter() + .map(|(_, stake)| stake.as_yoctonear()) + .sum(); + for height in 1..=epoch_length { + for &shard_id in &shard_ids { + let designated: u128 = epoch_manager + .get_chunk_validator_assignments(&epoch_id, shard_id, height) + .unwrap() + .assignments() + .iter() + .map(|(_, stake)| stake.as_yoctonear()) + .sum(); + assert!( + designated * 3 < total_stake, + "designated stake reaches 1/3 at height {height} shard {shard_id}: {designated} of {total_stake}", + ); + } + } +} + +/// Asserts `header`'s block is certified and at least one of its chunks certified via the all-stake +/// fallback: the present designated endorsements alone don't meet the designated 2/3 threshold, so +/// the non-designated remainder must have carried it. +fn assert_certified_via_fallback(node: &TestLoopNode, header: &BlockHeader) { + let client = node.client(); + let core_reader = &client.chain.spice_core_reader; + let epoch_manager = client.epoch_manager.as_ref(); + assert!(core_reader.all_execution_results_exist(header).unwrap(), "block is not certified"); + + let epoch_id = header.epoch_id(); + let mut fallback_certified_chunks = 0; + for shard_id in epoch_manager.get_shard_layout(epoch_id).unwrap().shard_ids() { + let designated = epoch_manager + .get_chunk_validator_assignments(epoch_id, shard_id, header.height()) + .unwrap(); + let designated_present: HashSet = designated + .assignments() + .iter() + .map(|(account_id, _)| account_id) + .filter(|account_id| { + core_reader.get_endorsement(header.hash(), shard_id, account_id).is_some() + }) + .cloned() + .collect(); + if !designated.is_endorsed(&designated_present) { + fallback_certified_chunks += 1; + } + } + assert!( + fallback_certified_chunks > 0, + "every chunk met the designated threshold; none certified via the fallback", + ); +} + +#[test] +#[cfg_attr(not(feature = "protocol_feature_spice"), ignore)] +fn slow_test_spice_all_stake_fallback_certifies_without_designated_endorsements() { + init_test_logger(); + + let (accounts, genesis, epoch_config_store) = FallbackSetup::new().build(); + let mut env = TestLoopBuilder::new() + .genesis(genesis) + .epoch_config_store(epoch_config_store) + .clients(accounts) + .build() + .drop(DropCondition::DesignatedSpiceEndorsements); + assert_fallback_has_enough_stake(&env.node(0)); + + // The certified frontier must keep advancing via the all-stake fallback though every designated + // endorsement is dropped (slowly: ~1 block per fallback window under a total outage). + let target = env.node(0).last_certified_block_header().height() + 4; + env.node_runner(0).run_until( + |node| node.last_certified_block_header().height() >= target, + Duration::seconds(60), + ); + let frontier = env.node(0).last_certified_block_header(); + assert_certified_via_fallback(&env.node(0), frontier.as_ref()); +} + +#[test] +#[cfg_attr(not(feature = "protocol_feature_spice"), ignore)] +fn slow_test_spice_all_stake_fallback_certifies_across_epoch_boundary() { + init_test_logger(); + + let epoch_length = 25; + let (accounts, genesis, epoch_config_store) = + FallbackSetup::new().epoch_length(epoch_length).build(); + let mut env = TestLoopBuilder::new() + .genesis(genesis) + .epoch_config_store(epoch_config_store) + .clients(accounts) + .build(); + assert_fallback_has_enough_stake(&env.node(0)); + + // Run normally up to just before the epoch boundary, then drop designated endorsements: the + // all-stake fallback alone must carry certification across into the next epoch. + env.node_runner(0) + .run_until(|node| node.head().height >= epoch_length - 5, Duration::seconds(60)); + let initial_epoch = env.node(0).head().epoch_id; + let mut env = env.drop(DropCondition::DesignatedSpiceEndorsements); + + // The certified frontier must reach the second epoch (height past the boundary, with a different + // epoch id), proving the fallback certified the boundary chunks under the new assignments. + env.node_runner(0).run_until( + |node| { + let header = node.last_certified_block_header(); + let epoch_id = node.client().epoch_manager.get_epoch_id(header.hash()).unwrap(); + header.height() > epoch_length && epoch_id != initial_epoch + }, + Duration::seconds(120), + ); + let frontier = env.node(0).last_certified_block_header(); + assert_certified_via_fallback(&env.node(0), frontier.as_ref()); +} diff --git a/test-loop-tests/src/tests/spice/mod.rs b/test-loop-tests/src/tests/spice/mod.rs index 48aa5d481c8..342b2130a31 100644 --- a/test-loop-tests/src/tests/spice/mod.rs +++ b/test-loop-tests/src/tests/spice/mod.rs @@ -1,3 +1,4 @@ +mod all_stake_fallback; mod basic; mod congestion; mod malicious_chunk_producer; diff --git a/test-loop-tests/src/utils/network.rs b/test-loop-tests/src/utils/network.rs index 25c970c728d..10f04259821 100644 --- a/test-loop-tests/src/utils/network.rs +++ b/test-loop-tests/src/utils/network.rs @@ -58,6 +58,32 @@ pub fn chunk_endorsement_dropper_by_hash( }) } +/// Drops every SPICE chunk endorsement whose sender is designated for the endorsed chunk, so chunks +/// can only certify via the all-stake fallback. Non-designated endorsements pass through. +pub fn spice_designated_endorsement_dropper( + epoch_manager: Arc, +) -> Box HandlerResult> { + Box::new(move |request| { + let NetworkRequests::SpiceChunkEndorsement(_target, endorsement) = &request else { + return HandlerResult::Unhandled(request); + }; + let Ok(block_info) = epoch_manager.get_block_info(endorsement.block_hash()) else { + return HandlerResult::Unhandled(request); + }; + let Ok(assignments) = epoch_manager.get_chunk_validator_assignments( + block_info.epoch_id(), + endorsement.shard_id(), + block_info.height(), + ) else { + return HandlerResult::Unhandled(request); + }; + if assignments.contains(endorsement.account_id()) { + return HandlerResult::Handled(NetworkResponses::NoResponse); + } + HandlerResult::Unhandled(request) + }) +} + /// Handler to drop all network messages containing chunk endorsements sent /// from a given chunk-validator account. pub fn chunk_endorsement_dropper( diff --git a/test-loop-tests/src/utils/node.rs b/test-loop-tests/src/utils/node.rs index 4454e097233..d82275b6b0a 100644 --- a/test-loop-tests/src/utils/node.rs +++ b/test-loop-tests/src/utils/node.rs @@ -6,6 +6,7 @@ use near_async::messaging::CanSend; use near_async::test_loop::TestLoopV2; use near_async::test_loop::data::TestLoopData; use near_async::time::Duration; +use near_chain::spice::core::get_last_certified_block_header; use near_chain::types::Tip; use near_chain::{Block, BlockHeader}; use near_client::client_actor::ClientActor; @@ -87,6 +88,12 @@ impl<'a> TestLoopNode<'a> { self.block(block_hash) } + pub fn last_certified_block_header(&self) -> Arc { + let chain_store = &self.client().chain.chain_store; + let head_hash = self.head().last_block_hash; + get_last_certified_block_header(chain_store, &head_hash).unwrap() + } + pub fn block(&self, block_hash: CryptoHash) -> Arc { self.client().chain.get_block(&block_hash).unwrap() }