Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion chain/client/src/spice/data_distributor_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use near_async::messaging::Handler;
use near_async::messaging::Sender;
use near_async::time::Duration;
use near_chain::Block;
use near_chain::spice::core::SpiceCoreReader;
use near_chain::spice::core::{SpiceCoreReader, fallback_eligible};
use near_chain::spice::core_writer_actor::ProcessedBlock;
use near_chain::stateless_validation::metrics::PROCESS_CONTRACT_CODE_REQUEST_TIME;
use near_chain_configs::MutableValidatorSigner;
Expand Down Expand Up @@ -329,6 +329,9 @@ impl Handler<SpiceContractCodeResponseMessage> for SpiceDataDistributorActor {

impl Handler<ProcessedBlock> for SpiceDataDistributorActor {
fn handle(&mut self, ProcessedBlock { block_hash }: ProcessedBlock) {
if let Err(err) = self.contribute_fallback_endorsements(&block_hash) {
tracing::error!(target: "spice_data_distribution", ?err, "failed contributing fallback endorsements");
}
if let Err(err) = self.start_waiting_on_data(&block_hash) {
tracing::error!(target: "spice_data_distribution", ?err, ?block_hash, "failure when starting waiting on data");
}
Expand Down Expand Up @@ -838,6 +841,76 @@ impl SpiceDataDistributorActor {

// TODO(spice): Implement a state machine to track all the data we produce or may need. This
// would help make sure that we cannot have and request data at the same time.
/// As a non-designated, non-tracking epoch validator, certify overdue chunks via the all-stake
/// fallback by pulling each chunk's witness so we can validate and endorse it. Re-evaluated per
/// block; once we've endorsed a chunk there's nothing more to do.
fn contribute_fallback_endorsements(&mut self, block_hash: &CryptoHash) -> Result<(), Error> {
let Some(signer) = self.validator_signer.get() else {
return Ok(());
};
let me = signer.validator_id();
let block = self.chain_store.get_block(block_hash)?;
let carrying_height = block.header().height() + 1;

for chunk_info in self.core_reader.get_uncertified_chunks(block_hash)? {
let chunk_id = &chunk_info.chunk_id;
if !fallback_eligible(&self.chain_store, carrying_height, block_hash, chunk_id)? {
continue;
}
let chunk_block = self.chain_store.get_block(&chunk_id.block_hash)?;
let epoch_id = chunk_block.header().epoch_id();
if self.epoch_manager.get_validator_by_account_id(epoch_id, me).is_err() {
continue;
}
let assignments = self.epoch_manager.get_chunk_validator_assignments(
epoch_id,
chunk_id.shard_id,
chunk_block.header().height(),
)?;
if assignments.contains(me) {
continue;
}
// We already endorsed (the witness-validation flow broadcasts once eligible).
if self.core_reader.endorsement_exists(&chunk_id.block_hash, chunk_id.shard_id, me) {
continue;
}
// A non-tracker has no result to endorse; pull the witness so it can produce one.
let tracks_shard = self.shard_tracker.should_apply_chunk(
ApplyChunksMode::IsCaughtUp,
chunk_block.header().prev_hash(),
chunk_id.shard_id,
);
if !tracks_shard {
self.start_waiting_on_fallback_witness(chunk_id, &chunk_block, me)?;
}
}
Ok(())
}

/// Pull a chunk's witness (not received by non-designated validators in the initial
/// distribution) so we can apply the chunk and endorse it for the fallback.
fn start_waiting_on_fallback_witness(
&mut self,
chunk_id: &SpiceChunkId,
chunk_block: &Block,
me: &AccountId,
) -> Result<(), Error> {
let id = SpiceDataIdentifier::Witness {
block_hash: chunk_id.block_hash,
shard_id: chunk_id.shard_id,
};
let (_recipients, producers) = self.recipients_and_producers(&id, chunk_block)?;
if producers.contains(me)
|| self.waiting_on_data.contains_key(&id)
|| self.recently_decoded_data.contains(&id)
|| self.is_data_known(me, chunk_block, &id)
{
return Ok(());
}
self.waiting_on_data.insert(id, HashMap::new());
Ok(())
}

fn start_waiting_on_data(&mut self, block_hash: &CryptoHash) -> Result<(), Error> {
let signer = self.validator_signer.get();
let me = signer.as_ref().map(|signer| signer.validator_id());
Expand Down
8 changes: 7 additions & 1 deletion core/primitives/src/spice/chunk_endorsement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::validator_signer::ValidatorSigner;
use borsh::{BorshDeserialize, BorshSerialize};
use near_crypto::{PublicKey, Signature};
use near_primitives_core::hash::CryptoHash;
use near_primitives_core::types::AccountId;
use near_primitives_core::types::{AccountId, ShardId};
use near_schema_checker_lib::ProtocolSchema;
use std::fmt::Debug;

Expand Down Expand Up @@ -53,6 +53,12 @@ impl SpiceChunkEndorsement {
}
}

pub fn shard_id(&self) -> ShardId {
match self {
Self::V1(v1) => v1.chunk_id.shard_id,
}
}

/// Checks signatures are returns SpiceVerifiedEndorsement if it's correct.
/// NOTE: that it doesn't do any additional validations apart from checking signature.
pub fn into_verified(self, public_key: &PublicKey) -> Option<SpiceVerifiedEndorsement> {
Expand Down
17 changes: 17 additions & 0 deletions test-loop-tests/src/setup/drop_condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::peer_manager_actor::NetworkRequestHandler;
use super::state::NodeExecutionData;
use crate::utils::network::{
block_dropper_by_height, chunk_endorsement_dropper, chunk_endorsement_dropper_by_hash,
spice_designated_endorsement_dropper,
};
use near_async::messaging::{CanSend, LateBoundSender};
use near_async::test_loop::data::TestLoopData;
Expand Down Expand Up @@ -34,6 +35,10 @@ pub enum DropCondition {
ChunksProducedByHeight(HashMap<ShardId, Vec<bool>>),
// Drops Block broadcast messages with height in `self.0`
BlocksByHeight(HashSet<BlockHeight>),
/// Drops every SPICE chunk endorsement whose sender is designated for the endorsed chunk,
/// disabling the designated certification path so chunks can certify only via the all-stake
/// fallback.
DesignatedSpiceEndorsements,
}

/// Stores all chunks ever observed on chain. Determines if a chunk can be
Expand Down Expand Up @@ -126,9 +131,21 @@ impl NodeExecutionData {
DropCondition::BlocksByHeight(heights) => {
self.register_drop_blocks_by_height(test_loop_data, heights);
}
DropCondition::DesignatedSpiceEndorsements => {
self.register_drop_designated_spice_endorsements(test_loop_data);
}
}
}

fn register_drop_designated_spice_endorsements(&self, test_loop_data: &mut TestLoopData) {
let client_actor = test_loop_data.get(&self.client_sender.actor_handle());
let epoch_manager = client_actor.client.chain.epoch_manager.clone();
self.register_override_handler(
test_loop_data,
spice_designated_endorsement_dropper(epoch_manager),
);
}

fn register_drop_chunks_validated_by(
&self,
test_loop_data: &mut TestLoopData,
Expand Down
188 changes: 188 additions & 0 deletions test-loop-tests/src/tests/spice/all_stake_fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use crate::setup::builder::TestLoopBuilder;
use crate::setup::drop_condition::DropCondition;
use crate::utils::node::TestLoopNode;
use itertools::Itertools;
use near_async::time::Duration;
use near_chain::spice::core::all_stake_fallback_assignment;
use near_chain_configs::Genesis;
use near_chain_configs::test_genesis::{TestEpochConfigBuilder, ValidatorsSpec};
use near_o11y::testonly::init_test_logger;
use near_primitives::block::BlockHeader;
use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::test_utils::create_test_signer;
use near_primitives::types::{AccountId, AccountInfo, Balance, BlockHeightDelta};
use std::collections::HashSet;

/// Genesis, epoch config, and client accounts for the fallback tests: 14 equal-stake validators
/// over 6 shards with 2 mandates each, keeping each chunk's designated subset under 1/3 of stake.
#[derive(Default)]
struct FallbackSetup {
epoch_length: Option<BlockHeightDelta>,
}

impl FallbackSetup {
fn new() -> Self {
Self::default()
}

fn epoch_length(mut self, epoch_length: BlockHeightDelta) -> Self {
self.epoch_length = Some(epoch_length);
self
}

fn build(self) -> (Vec<AccountId>, Genesis, EpochConfigStore) {
let num_producers = 4;
let num_validators = 14;
let accounts: Vec<AccountId> =
(0..num_validators).map(|i| format!("validator{i}").parse().unwrap()).collect_vec();
let all_validators: Vec<AccountInfo> = accounts
.iter()
.map(|account_id| AccountInfo {
public_key: create_test_signer(account_id.as_str()).public_key(),
account_id: account_id.clone(),
amount: Balance::from_near(100),
})
.collect();
let validators_spec =
ValidatorsSpec::raw(all_validators, num_producers, num_producers, num_validators);

let mut genesis_builder = TestLoopBuilder::new_genesis_builder()
.shard_layout(ShardLayout::multi_shard(6, 0))
.validators_spec(validators_spec);
if let Some(epoch_length) = self.epoch_length {
genesis_builder = genesis_builder.epoch_length(epoch_length);
}
let genesis = genesis_builder.build();
let epoch_config_store = TestEpochConfigBuilder::from_genesis(&genesis)
.target_validator_mandates_per_shard(2)
.build_store_for_genesis_protocol_version();
(accounts, genesis, epoch_config_store)
}
}

/// Asserts every (height, shard) of the genesis epoch has designated validators under 1/3 of total
/// stake, so the fallback's non-designated remainder can reach 2/3. Genesis epoch is representative.
fn assert_fallback_has_enough_stake(node: &TestLoopNode) {
let epoch_manager = node.client().epoch_manager.clone();
let epoch_id = node.head().epoch_id;
let epoch_length = epoch_manager.get_epoch_config(&epoch_id).unwrap().epoch_length;
let shard_ids: Vec<_> =
epoch_manager.get_shard_layout(&epoch_id).unwrap().shard_ids().collect();
let total_stake: u128 = all_stake_fallback_assignment(epoch_manager.as_ref(), &epoch_id)
.unwrap()
.assignments()
.iter()
.map(|(_, stake)| stake.as_yoctonear())
.sum();
for height in 1..=epoch_length {
for &shard_id in &shard_ids {
let designated: u128 = epoch_manager
.get_chunk_validator_assignments(&epoch_id, shard_id, height)
.unwrap()
.assignments()
.iter()
.map(|(_, stake)| stake.as_yoctonear())
.sum();
assert!(
designated * 3 < total_stake,
"designated stake reaches 1/3 at height {height} shard {shard_id}: {designated} of {total_stake}",
);
}
}
}

/// Asserts `header`'s block is certified and at least one of its chunks certified via the all-stake
/// fallback: the present designated endorsements alone don't meet the designated 2/3 threshold, so
/// the non-designated remainder must have carried it.
fn assert_certified_via_fallback(node: &TestLoopNode, header: &BlockHeader) {
let client = node.client();
let core_reader = &client.chain.spice_core_reader;
let epoch_manager = client.epoch_manager.as_ref();
assert!(core_reader.all_execution_results_exist(header).unwrap(), "block is not certified");

let epoch_id = header.epoch_id();
let mut fallback_certified_chunks = 0;
for shard_id in epoch_manager.get_shard_layout(epoch_id).unwrap().shard_ids() {
let designated = epoch_manager
.get_chunk_validator_assignments(epoch_id, shard_id, header.height())
.unwrap();
let designated_present: HashSet<AccountId> = designated
.assignments()
.iter()
.map(|(account_id, _)| account_id)
.filter(|account_id| {
core_reader.get_endorsement(header.hash(), shard_id, account_id).is_some()
})
.cloned()
.collect();
if !designated.is_endorsed(&designated_present) {
fallback_certified_chunks += 1;
}
}
assert!(
fallback_certified_chunks > 0,
"every chunk met the designated threshold; none certified via the fallback",
);
}

#[test]
#[cfg_attr(not(feature = "protocol_feature_spice"), ignore)]
fn slow_test_spice_all_stake_fallback_certifies_without_designated_endorsements() {
init_test_logger();

let (accounts, genesis, epoch_config_store) = FallbackSetup::new().build();
let mut env = TestLoopBuilder::new()
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(accounts)
.build()
.drop(DropCondition::DesignatedSpiceEndorsements);
assert_fallback_has_enough_stake(&env.node(0));

// The certified frontier must keep advancing via the all-stake fallback though every designated
// endorsement is dropped (slowly: ~1 block per fallback window under a total outage).
let target = env.node(0).last_certified_block_header().height() + 4;
env.node_runner(0).run_until(
|node| node.last_certified_block_header().height() >= target,
Duration::seconds(60),
);
let frontier = env.node(0).last_certified_block_header();
assert_certified_via_fallback(&env.node(0), frontier.as_ref());
}

#[test]
#[cfg_attr(not(feature = "protocol_feature_spice"), ignore)]
fn slow_test_spice_all_stake_fallback_certifies_across_epoch_boundary() {
init_test_logger();

let epoch_length = 25;
let (accounts, genesis, epoch_config_store) =
FallbackSetup::new().epoch_length(epoch_length).build();
let mut env = TestLoopBuilder::new()
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(accounts)
.build();
assert_fallback_has_enough_stake(&env.node(0));

// Run normally up to just before the epoch boundary, then drop designated endorsements: the
// all-stake fallback alone must carry certification across into the next epoch.
env.node_runner(0)
.run_until(|node| node.head().height >= epoch_length - 5, Duration::seconds(60));
let initial_epoch = env.node(0).head().epoch_id;
let mut env = env.drop(DropCondition::DesignatedSpiceEndorsements);

// The certified frontier must reach the second epoch (height past the boundary, with a different
// epoch id), proving the fallback certified the boundary chunks under the new assignments.
env.node_runner(0).run_until(
|node| {
let header = node.last_certified_block_header();
let epoch_id = node.client().epoch_manager.get_epoch_id(header.hash()).unwrap();
header.height() > epoch_length && epoch_id != initial_epoch
},
Duration::seconds(120),
);
let frontier = env.node(0).last_certified_block_header();
assert_certified_via_fallback(&env.node(0), frontier.as_ref());
}
1 change: 1 addition & 0 deletions test-loop-tests/src/tests/spice/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod all_stake_fallback;
mod basic;
mod congestion;
mod malicious_chunk_producer;
Expand Down
26 changes: 26 additions & 0 deletions test-loop-tests/src/utils/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,32 @@ pub fn chunk_endorsement_dropper_by_hash(
})
}

/// Drops every SPICE chunk endorsement whose sender is designated for the endorsed chunk, so chunks
/// can only certify via the all-stake fallback. Non-designated endorsements pass through.
pub fn spice_designated_endorsement_dropper(
epoch_manager: Arc<dyn EpochManagerAdapter>,
) -> Box<dyn Fn(NetworkRequests) -> HandlerResult> {
Box::new(move |request| {
let NetworkRequests::SpiceChunkEndorsement(_target, endorsement) = &request else {
return HandlerResult::Unhandled(request);
};
let Ok(block_info) = epoch_manager.get_block_info(endorsement.block_hash()) else {
return HandlerResult::Unhandled(request);
};
let Ok(assignments) = epoch_manager.get_chunk_validator_assignments(
block_info.epoch_id(),
endorsement.shard_id(),
block_info.height(),
) else {
return HandlerResult::Unhandled(request);
};
if assignments.contains(endorsement.account_id()) {
return HandlerResult::Handled(NetworkResponses::NoResponse);
}
HandlerResult::Unhandled(request)
})
}

/// Handler to drop all network messages containing chunk endorsements sent
/// from a given chunk-validator account.
pub fn chunk_endorsement_dropper(
Expand Down
Loading
Loading