diff --git a/chain/chain/src/runtime/tests.rs b/chain/chain/src/runtime/tests.rs index e954eb46be2..9a15a98bafe 100644 --- a/chain/chain/src/runtime/tests.rs +++ b/chain/chain/src/runtime/tests.rs @@ -214,6 +214,7 @@ impl TestEnv { ) .unwrap() .commit(); + epoch_manager.read().seed_chunk_producers_for_test(&genesis_hash); Self { epoch_manager, runtime, @@ -386,6 +387,7 @@ impl TestEnv { ) .unwrap() .commit(); + self.epoch_manager.read().seed_chunk_producers_for_test(&new_hash); let shard_layout = self.epoch_manager.get_shard_layout_from_prev_block(&new_hash).unwrap(); let mut new_receipts = HashMap::<_, Vec>::new(); for receipt in all_receipts { @@ -904,6 +906,7 @@ fn test_state_sync() { ) .unwrap() .commit(); + new_env.epoch_manager.read().seed_chunk_producers_for_test(&cur_hash); new_env.head.height = i; new_env.head.last_block_hash = cur_hash; new_env.head.prev_block_hash = prev_hash; diff --git a/chain/chain/src/tests/garbage_collection.rs b/chain/chain/src/tests/garbage_collection.rs index 356afdce987..dbf4cbcfd56 100644 --- a/chain/chain/src/tests/garbage_collection.rs +++ b/chain/chain/src/tests/garbage_collection.rs @@ -104,6 +104,15 @@ fn do_fork( ) .unwrap(); store_update.merge(epoch_manager_update.into()); + // Mirror production so the nightly EarlyKickout aggregator reads seeded + // rows instead of tripping the missing-row assert. + store_update + .save_chunk_producers_for_header( + epoch_manager.as_ref(), + block.header(), + PROTOCOL_VERSION, + ) + .unwrap(); let mut trie_changes_shards = Vec::new(); for shard_id in 0..num_shards { @@ -766,6 +775,11 @@ fn add_block( ) .unwrap(); store_update.merge(epoch_manager_update.into()); + // Mirror production so the nightly EarlyKickout aggregator reads seeded rows + // instead of tripping the missing-row assert. + store_update + .save_chunk_producers_for_header(epoch_manager, block.header(), PROTOCOL_VERSION) + .unwrap(); store_update.commit().unwrap(); *prev_block = block.clone(); } diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index fd7c63789b8..27bd5b77054 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -1895,7 +1895,6 @@ impl EpochManager { &epoch_info, &shard_layout, &prev_hash, - prev_height + 1, ); let block_info = self.get_block_info(&cur_hash)?; aggregator.update_tail( @@ -1932,7 +1931,6 @@ impl EpochManager { epoch_info: &EpochInfo, shard_layout: &ShardLayout, prev_hash: &CryptoHash, - height: BlockHeight, ) -> Option> { // `DBCol::ChunkProducers` only exists under nightly (as does an enabled // EarlyKickout); stable always uses the legacy sampler. @@ -1955,6 +1953,11 @@ impl EpochManager { if anchor_block_info.epoch_id() != epoch_id { return None; } + // Reproduce what the writer stored at the anchor: a sample at + // `anchor.height + 2`, not the chunk height. They differ only when + // `prev` skipped heights above `anchor` (e.g. post epoch-sync). + let anchor_sample_height = + anchor_block_info.height() + CHUNK_GRANDPARENT_ANCHOR_HEIGHT_OFFSET; let mut chunk_producers = HashMap::new(); for shard_id in shard_layout.shard_ids() { let key = get_block_shard_id(&anchor, shard_id); @@ -1962,20 +1965,49 @@ impl EpochManager { .store .store_ref() .get_ser::(DBCol::ChunkProducers, &key) - .and_then(|stake| epoch_info.get_validator_id(stake.account_id()).copied()) { - Some(validator_id) => validator_id, None => { - // Row absent (writer bug) or account not in the epoch; - // keep aggregation alive with the canonical sample. + // Only legitimate same-epoch miss: the epoch-sync first + // block, installed without seeding (client/src/sync/epoch.rs). + debug_assert!( + anchor == *anchor_block_info.epoch_first_block(), + "unexpected missing ChunkProducers row for non-first-block anchor {anchor} (writer bug?)" + ); tracing::debug!( target: "epoch_tracker", ?anchor, %shard_id, - "chunk producer missing in DB during aggregation, sampling canonically", + "chunk producer row absent during aggregation, sampling canonically", ); - epoch_info.sample_chunk_producer(shard_layout, shard_id, height)? + epoch_info.sample_chunk_producer( + shard_layout, + shard_id, + anchor_sample_height, + )? } + Some(stake) => match epoch_info.get_validator_id(stake.account_id()).copied() { + Some(validator_id) => validator_id, + None => { + // Unreachable in correct operation: the same-epoch + // writer sampled this stake from this `epoch_info`. + debug_assert!( + false, + "account {} from ChunkProducers row not in epoch {epoch_id:?}", + stake.account_id() + ); + tracing::debug!( + target: "epoch_tracker", + ?anchor, + %shard_id, + "chunk producer not in epoch during aggregation, sampling canonically", + ); + epoch_info.sample_chunk_producer( + shard_layout, + shard_id, + anchor_sample_height, + )? + } + }, }; chunk_producers.insert(shard_id, validator_id); } @@ -1983,11 +2015,67 @@ impl EpochManager { } #[cfg(not(feature = "nightly"))] { - let _ = (epoch_id, epoch_info, shard_layout, prev_hash, height); + let _ = (epoch_id, epoch_info, shard_layout, prev_hash); None } } + /// Test-only: seed `DBCol::ChunkProducers` for an already-recorded block, + /// mirroring `ChainStoreUpdate::save_chunk_producers_for_header` so nightly + /// tests exercise the DB-anchored path the harnesses otherwise never seed. + /// + /// Must run after the block's `record_block_info` commit (reads the recorded + /// `BlockInfo`). Idempotent; EarlyKickout-gated (no-op on stable). + pub fn seed_chunk_producers_for_test(&self, block_hash: &CryptoHash) { + #[cfg(feature = "nightly")] + { + use near_primitives::utils::get_block_shard_id; + use near_store::DBCol; + + let block_info = + self.get_block_info(block_hash).expect("block must be recorded before seeding"); + // Gate on the anchor's own epoch, mirroring production's + // `get_epoch_protocol_version(header.epoch_id())`: a row exists iff the + // anchor's epoch has EarlyKickout, which is exactly when the aggregator's + // same-epoch read path applies. Gating on the epoch *after* the anchor + // would seed dead rows for last-of-epoch anchors across an activation edge. + let own_epoch_info = + self.get_epoch_info(block_info.epoch_id()).expect("anchor epoch info"); + if !ProtocolFeature::EarlyKickout.enabled(own_epoch_info.protocol_version()) { + return; + } + // Sample from the epoch *after* the anchor, matching + // `save_chunk_producers_for_header`'s `get_epoch_id_from_prev_block`. + let epoch_id = if self + .is_next_block_epoch_start(block_hash) + .expect("block must be recorded before seeding") + { + self.get_next_epoch_id(block_hash).expect("next epoch id") + } else { + self.get_epoch_id(block_hash).expect("epoch id") + }; + let epoch_info = self.get_epoch_info(&epoch_id).expect("epoch info"); + let shard_layout = self.get_shard_layout(&epoch_id).expect("shard layout"); + let height = block_info.height() + CHUNK_GRANDPARENT_ANCHOR_HEIGHT_OFFSET; + let mut store_update = self.store.store_ref().store_update(); + for shard_id in shard_layout.shard_ids() { + if let Some(validator_id) = + epoch_info.sample_chunk_producer(&shard_layout, shard_id, height) + { + let validator_stake = epoch_info.get_validator(validator_id); + store_update.insert_ser( + DBCol::ChunkProducers, + &get_block_shard_id(block_hash, shard_id), + &validator_stake, + ); + } + } + store_update.commit(); + } + #[cfg(not(feature = "nightly"))] + let _ = block_hash; + } + /// Get the shard split to include in the block header, if any. /// /// This method is expected to be called during the production of the last block of an epoch. diff --git a/chain/epoch-manager/src/shard_tracker.rs b/chain/epoch-manager/src/shard_tracker.rs index d98703f4a37..8c1f4f2aa9b 100644 --- a/chain/epoch-manager/src/shard_tracker.rs +++ b/chain/epoch-manager/src/shard_tracker.rs @@ -701,6 +701,8 @@ mod tests { ) .unwrap() .commit(); + // Seed once: `record_blocks` funnels through here. + epoch_manager.seed_chunk_producers_for_test(&cur_h); } // Simulates block production over the given height range using the specified protocol version and block hashes. diff --git a/chain/epoch-manager/src/test_utils.rs b/chain/epoch-manager/src/test_utils.rs index bd479c6f69a..229d6cda3d5 100644 --- a/chain/epoch-manager/src/test_utils.rs +++ b/chain/epoch-manager/src/test_utils.rs @@ -349,6 +349,7 @@ pub fn record_block_with_final_block_hash( ) .unwrap() .commit(); + epoch_manager.seed_chunk_producers_for_test(&cur_h); } pub fn record_block( @@ -403,6 +404,8 @@ pub fn record_block_with_version( ) .unwrap() .commit(); + // Seed once: `record_block` and `record_blocks` funnel through here. + epoch_manager.seed_chunk_producers_for_test(&cur_h); } pub fn record_blocks( @@ -465,5 +468,7 @@ pub fn block_info( } pub fn record_with_block_info(epoch_manager: &mut EpochManager, block_info: BlockInfo) { + let hash = *block_info.hash(); epoch_manager.record_block_info(block_info, [0; 32]).unwrap().commit(); + epoch_manager.seed_chunk_producers_for_test(&hash); } diff --git a/chain/epoch-manager/src/tests/mod.rs b/chain/epoch-manager/src/tests/mod.rs index 42a2b35aaf5..6e569c12bb8 100644 --- a/chain/epoch-manager/src/tests/mod.rs +++ b/chain/epoch-manager/src/tests/mod.rs @@ -519,18 +519,21 @@ fn test_validator_reward_one_validator() { rng_seed, ) .unwrap(); + epoch_manager.seed_chunk_producers_for_test(&h[0]); epoch_manager .record_block_info( block_info(h[1], 1, 1, h[0], h[0], h[1], vec![true], total_supply, num_validators), rng_seed, ) .unwrap(); + epoch_manager.seed_chunk_producers_for_test(&h[1]); epoch_manager .record_block_info( block_info(h[2], 2, 2, h[1], h[1], h[1], vec![true], total_supply, num_validators), rng_seed, ) .unwrap(); + epoch_manager.seed_chunk_producers_for_test(&h[2]); let mut validator_online_ratio = HashMap::new(); validator_online_ratio.insert( "test2".parse().unwrap(), @@ -931,6 +934,7 @@ fn test_expected_chunks() { ) .unwrap() .commit(); + epoch_manager.read().seed_chunk_producers_for_test(curr_block); prev_block = *curr_block; if epoch_id != initial_epoch_id { @@ -1015,6 +1019,7 @@ fn test_expected_chunks_prev_block_not_produced() { ) .unwrap() .commit(); + epoch_manager.read().seed_chunk_producers_for_test(curr_block); prev_block = *curr_block; } if epoch_id != initial_epoch_id { @@ -1561,6 +1566,8 @@ fn test_chunk_producer_kickout() { rng_seed, ) .unwrap(); + // Uncommitted record, but inline aggregation still reads the anchor row. + em.read().seed_chunk_producers_for_test(curr_block); } let last_epoch_info = @@ -1636,6 +1643,8 @@ fn test_chunk_validator_kickout_using_production_stats() { rng_seed, ) .unwrap(); + // Uncommitted record, but inline aggregation still reads the anchor row. + em.read().seed_chunk_producers_for_test(curr_block); } let last_epoch_info = @@ -1747,6 +1756,7 @@ fn test_chunk_validator_kickout_using_endorsement_stats() { rng_seed, ) .unwrap(); + em.read().seed_chunk_producers_for_test(curr_block); } let last_epoch_info = @@ -2347,6 +2357,7 @@ fn test_final_block_consistency() { ) .unwrap() .commit(); + epoch_manager.seed_chunk_producers_for_test(&h[5]); let new_epoch_aggregator_final_hash = epoch_manager.epoch_info_aggregator.last_block_hash; assert_eq!(epoch_aggregator_final_hash, new_epoch_aggregator_final_hash); } @@ -3447,6 +3458,7 @@ fn test_possible_epochs_of_height_around_tip() { num_validators, ); epoch_manager.write().record_block_info(block_info, [0; 32]).unwrap().commit(); + epoch_manager.read().seed_chunk_producers_for_test(&h[i]); let tip = Tip { height, last_block_hash: h[i], @@ -3511,6 +3523,7 @@ fn test_possible_epochs_of_height_around_tip() { num_validators, ); epoch_manager.write().record_block_info(block_info, [0; 32]).unwrap().commit(); + epoch_manager.read().seed_chunk_producers_for_test(&h[i]); let tip = Tip { height, last_block_hash: h[i], @@ -3692,6 +3705,66 @@ fn test_is_next_block_in_next_epoch_spice_gate() { ); } +/// Records one block (`prev == default` for genesis) and seeds its anchor row. +/// +/// Endorsements are built at the chunk height (`prev.height + 1`), which differs +/// from the block height only when the block skips heights above its parent. +#[cfg(feature = "nightly")] +fn record_seeded_block( + em: &mut EpochManager, + hash: CryptoHash, + height: BlockHeight, + prev: CryptoHash, +) { + let genesis = prev == CryptoHash::default(); + let epoch_id = if genesis { EpochId::default() } else { em.get_epoch_id(&prev).unwrap() }; + let chunk_height = + if genesis { height } else { em.get_block_info(&prev).unwrap().height() + 1 }; + let shard_layout = em.get_shard_layout(&epoch_id).unwrap(); + let num_shards = shard_layout.shard_ids().count(); + let chunk_endorsements = ChunkEndorsementsBitmap::from_endorsements( + shard_layout + .shard_ids() + .map(|shard_id| { + let assignments = + em.get_chunk_validator_assignments(&epoch_id, shard_id, chunk_height).unwrap(); + vec![true; assignments.assignments().iter().len()] + }) + .collect(), + ); + em.record_block_info( + BlockInfo::new( + hash, + height, + height.saturating_sub(2), + prev, + prev, + vec![], + vec![true; num_shards], + DEFAULT_TOTAL_SUPPLY, + PROTOCOL_VERSION, + PROTOCOL_VERSION, + height * NUM_NS_IN_SECOND, + chunk_endorsements, + None, + ), + [0; 32], + ) + .unwrap() + .commit(); + em.seed_chunk_producers_for_test(&hash); +} + +/// Records the consecutive chain `h` (`h[0]` genesis) and seeds each anchor row. +#[cfg(feature = "nightly")] +fn record_seeded_anchored_chain(em: &mut EpochManager, h: &[CryptoHash]) { + let mut prev = CryptoHash::default(); + for (height, hash) in h.iter().enumerate() { + record_seeded_block(em, *hash, height as u64, prev); + prev = *hash; + } +} + /// Aggregator attributes chunk production via the anchor's DB row, not the canonical sampler. #[cfg(feature = "nightly")] #[test] @@ -3714,46 +3787,7 @@ fn test_aggregator_anchored_chunk_producers() { Rational32::new(0, 1), ); let h = hash_range(4); - - let mut prev = CryptoHash::default(); - for (height, hash) in h.iter().enumerate() { - let epoch_id = - if height == 0 { EpochId::default() } else { em.get_epoch_id(&prev).unwrap() }; - let shard_layout = em.get_shard_layout(&epoch_id).unwrap(); - let num_shards = shard_layout.shard_ids().count(); - let chunk_endorsements = ChunkEndorsementsBitmap::from_endorsements( - shard_layout - .shard_ids() - .map(|shard_id| { - let assignments = em - .get_chunk_validator_assignments(&epoch_id, shard_id, height as u64) - .unwrap(); - vec![true; assignments.assignments().iter().len()] - }) - .collect(), - ); - em.record_block_info( - BlockInfo::new( - *hash, - height as u64, - (height as u64).saturating_sub(2), - prev, - prev, - vec![], - vec![true; num_shards], - DEFAULT_TOTAL_SUPPLY, - PROTOCOL_VERSION, - PROTOCOL_VERSION, - height as u64 * NUM_NS_IN_SECOND, - chunk_endorsements, - None, - ), - [0; 32], - ) - .unwrap() - .commit(); - prev = *hash; - } + record_seeded_anchored_chain(&mut em, &h); let epoch_id = em.get_epoch_id(&h[3]).unwrap(); let epoch_info = em.get_epoch_info(&epoch_id).unwrap(); @@ -3793,3 +3827,145 @@ fn test_aggregator_anchored_chunk_producers() { ); } } + +/// Missing anchor row at the epoch's first block (the epoch-sync case) must fall +/// back to the sampler without tripping the missing-row `debug_assert`. +#[cfg(feature = "nightly")] +#[test] +fn test_aggregator_missing_epoch_first_block_row_falls_back() { + use near_primitives::utils::get_block_shard_id; + use near_store::DBCol; + + let stake_amount = Balance::from_yoctonear(1_000_000); + let validators = + vec![("test1".parse().unwrap(), stake_amount), ("test2".parse().unwrap(), stake_amount)]; + let mut em = setup_epoch_manager( + validators, + 10, + 1, + 2, + 10, + 10, + 0, + default_reward_calculator(), + Rational32::new(0, 1), + ); + let h = hash_range(4); + record_seeded_anchored_chain(&mut em, &h); + + let epoch_id = em.get_epoch_id(&h[3]).unwrap(); + let epoch_info = em.get_epoch_info(&epoch_id).unwrap(); + let shard_layout = em.get_shard_layout(&epoch_id).unwrap(); + + // `h[1]` is the epoch's first real block (genesis `h[0]` is pre-genesis); + // the height-3 chunk anchors at it. Drop its row to mimic epoch-sync. + assert_eq!(&h[1], em.get_block_info(&h[1]).unwrap().epoch_first_block()); + { + let mut update = em.store.store_ref().store_update(); + for shard_id in shard_layout.shard_ids() { + update.delete(DBCol::ChunkProducers, &get_block_shard_id(&h[1], shard_id)); + } + update.commit(); + } + + // Must not panic despite the missing first-block row. + let aggregator = em.get_epoch_info_aggregator_upto_last(&h[3]).unwrap(); + + // All heights resolve to the plain sampler (height 3 via the first-block miss). + for shard_id in shard_layout.shard_ids() { + let stats = &aggregator.shard_tracker[&shard_id]; + let mut expected: HashMap = HashMap::new(); + for height in 1..=3 { + let id = epoch_info.sample_chunk_producer(&shard_layout, shard_id, height).unwrap(); + *expected.entry(id).or_default() += 1; + } + for validator_id in [0, 1] { + let expected_count = expected.get(&validator_id).copied().unwrap_or(0); + let actual_count = stats.get(&validator_id).map(|s| s.expected()).unwrap_or(0); + assert_eq!( + actual_count, expected_count, + "chunk production attribution mismatch for validator {validator_id} on shard {shard_id}" + ); + } + } +} + +/// Under a skip, the missing-row fallback must sample at `anchor.height + 2`, not +/// the chunk height. With `prev` skipping above the grandparent anchor (the +/// post-epoch-sync shape), those differ, so the producer is anchor-determined. +#[cfg(feature = "nightly")] +#[test] +fn test_aggregator_skip_anchor_uses_anchor_height() { + use near_primitives::utils::get_block_shard_id; + use near_store::DBCol; + + let stake_amount = Balance::from_yoctonear(1_000_000); + let validators = + vec![("test1".parse().unwrap(), stake_amount), ("test2".parse().unwrap(), stake_amount)]; + let mut em = setup_epoch_manager( + validators, + 10, + 1, + 2, + 10, + 10, + 0, + default_reward_calculator(), + Rational32::new(0, 1), + ); + let h = hash_range(5); + // Deliberate skip: `h[3]` builds on `h[1]`, skipping height 2. + record_seeded_block(&mut em, h[0], 0, CryptoHash::default()); + record_seeded_block(&mut em, h[1], 1, h[0]); + record_seeded_block(&mut em, h[3], 3, h[1]); + record_seeded_block(&mut em, h[4], 4, h[3]); + + let epoch_id = em.get_epoch_id(&h[4]).unwrap(); + let epoch_info = em.get_epoch_info(&epoch_id).unwrap(); + let shard_layout = em.get_shard_layout(&epoch_id).unwrap(); + + // `h[4]`'s anchor is `h[3].prev == h[1]` (height 1), but its chunk height is + // `h[3].height + 1 == 4`. Dropping `h[1]`'s row forces the fallback: the fix + // samples at `anchor.height + 2 == 3`; the old code sampled at chunk height 4. + assert_eq!(&h[1], em.get_block_info(&h[1]).unwrap().epoch_first_block()); + { + let mut update = em.store.store_ref().store_update(); + for shard_id in shard_layout.shard_ids() { + update.delete(DBCol::ChunkProducers, &get_block_shard_id(&h[1], shard_id)); + } + update.commit(); + } + + // The test only discriminates the fix where `sample(3) != sample(4)`. + let discriminating = shard_layout.shard_ids().any(|shard_id| { + epoch_info.sample_chunk_producer(&shard_layout, shard_id, 3) + != epoch_info.sample_chunk_producer(&shard_layout, shard_id, 4) + }); + assert!( + discriminating, + "config does not exercise the skip divergence (sample(3) == sample(4) on every shard)" + ); + + // Must not panic: anchor `h[1]` is `epoch_first_block`, so the assert holds. + let aggregator = em.get_epoch_info_aggregator_upto_last(&h[4]).unwrap(); + + // Counted blocks resolve to: h[1] -> sample(1) (genesis-parent None arm), + // h[3] -> sample(2) (anchor h[0] seeded), h[4] -> sample(3) (anchor h[1] + // absent -> fallback at anchor.height + 2). The old code gave h[4] sample(4). + for shard_id in shard_layout.shard_ids() { + let stats = &aggregator.shard_tracker[&shard_id]; + let mut expected: HashMap = HashMap::new(); + for height in [1, 2, 3] { + let id = epoch_info.sample_chunk_producer(&shard_layout, shard_id, height).unwrap(); + *expected.entry(id).or_default() += 1; + } + for validator_id in [0, 1] { + let expected_count = expected.get(&validator_id).copied().unwrap_or(0); + let actual_count = stats.get(&validator_id).map(|s| s.expected()).unwrap_or(0); + assert_eq!( + actual_count, expected_count, + "chunk production attribution mismatch for validator {validator_id} on shard {shard_id}" + ); + } + } +}