Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chain/chain/src/runtime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl TestEnv {
)
.unwrap()
.commit();
epoch_manager.read().seed_chunk_producers_for_test(&genesis_hash);
Self {
epoch_manager,
runtime,
Expand Down Expand Up @@ -386,6 +387,7 @@ impl TestEnv {
)
.unwrap()
.commit();
self.epoch_manager.read().seed_chunk_producers_for_test(&new_hash);
let shard_layout = self.epoch_manager.get_shard_layout_from_prev_block(&new_hash).unwrap();
let mut new_receipts = HashMap::<_, Vec<Receipt>>::new();
for receipt in all_receipts {
Expand Down Expand Up @@ -904,6 +906,7 @@ fn test_state_sync() {
)
.unwrap()
.commit();
new_env.epoch_manager.read().seed_chunk_producers_for_test(&cur_hash);
new_env.head.height = i;
new_env.head.last_block_hash = cur_hash;
new_env.head.prev_block_hash = prev_hash;
Expand Down
14 changes: 14 additions & 0 deletions chain/chain/src/tests/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ fn do_fork(
)
.unwrap();
store_update.merge(epoch_manager_update.into());
// Mirror production so the nightly EarlyKickout aggregator reads seeded
// rows instead of tripping the missing-row assert.
store_update
.save_chunk_producers_for_header(
epoch_manager.as_ref(),
block.header(),
PROTOCOL_VERSION,
)
.unwrap();

let mut trie_changes_shards = Vec::new();
for shard_id in 0..num_shards {
Expand Down Expand Up @@ -766,6 +775,11 @@ fn add_block(
)
.unwrap();
store_update.merge(epoch_manager_update.into());
// Mirror production so the nightly EarlyKickout aggregator reads seeded rows
// instead of tripping the missing-row assert.
store_update
.save_chunk_producers_for_header(epoch_manager, block.header(), PROTOCOL_VERSION)
.unwrap();
store_update.commit().unwrap();
*prev_block = block.clone();
}
Expand Down
106 changes: 97 additions & 9 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,6 @@ impl EpochManager {
&epoch_info,
&shard_layout,
&prev_hash,
prev_height + 1,
);
let block_info = self.get_block_info(&cur_hash)?;
aggregator.update_tail(
Expand Down Expand Up @@ -1932,7 +1931,6 @@ impl EpochManager {
epoch_info: &EpochInfo,
shard_layout: &ShardLayout,
prev_hash: &CryptoHash,
height: BlockHeight,
) -> Option<HashMap<ShardId, ValidatorId>> {
// `DBCol::ChunkProducers` only exists under nightly (as does an enabled
// EarlyKickout); stable always uses the legacy sampler.
Expand All @@ -1955,39 +1953,129 @@ impl EpochManager {
if anchor_block_info.epoch_id() != epoch_id {
return None;
}
// Reproduce what the writer stored at the anchor: a sample at
// `anchor.height + 2`, not the chunk height. They differ only when
// `prev` skipped heights above `anchor` (e.g. post epoch-sync).
let anchor_sample_height =
anchor_block_info.height() + CHUNK_GRANDPARENT_ANCHOR_HEIGHT_OFFSET;
let mut chunk_producers = HashMap::new();
for shard_id in shard_layout.shard_ids() {
let key = get_block_shard_id(&anchor, shard_id);
let validator_id = match self
.store
.store_ref()
.get_ser::<ValidatorStake>(DBCol::ChunkProducers, &key)
.and_then(|stake| epoch_info.get_validator_id(stake.account_id()).copied())
{
Some(validator_id) => validator_id,
None => {
// Row absent (writer bug) or account not in the epoch;
// keep aggregation alive with the canonical sample.
// Only legitimate same-epoch miss: the epoch-sync first
// block, installed without seeding (client/src/sync/epoch.rs).
debug_assert!(
anchor == *anchor_block_info.epoch_first_block(),
"unexpected missing ChunkProducers row for non-first-block anchor {anchor} (writer bug?)"
);
tracing::debug!(
target: "epoch_tracker",
?anchor,
%shard_id,
"chunk producer missing in DB during aggregation, sampling canonically",
"chunk producer row absent during aggregation, sampling canonically",
);
epoch_info.sample_chunk_producer(shard_layout, shard_id, height)?
epoch_info.sample_chunk_producer(
shard_layout,
shard_id,
anchor_sample_height,
)?
}
Some(stake) => match epoch_info.get_validator_id(stake.account_id()).copied() {
Some(validator_id) => validator_id,
None => {
// Unreachable in correct operation: the same-epoch
// writer sampled this stake from this `epoch_info`.
debug_assert!(
false,
"account {} from ChunkProducers row not in epoch {epoch_id:?}",
stake.account_id()
);
tracing::debug!(
target: "epoch_tracker",
?anchor,
%shard_id,
"chunk producer not in epoch during aggregation, sampling canonically",
);
epoch_info.sample_chunk_producer(
shard_layout,
shard_id,
anchor_sample_height,
)?
}
},
};
chunk_producers.insert(shard_id, validator_id);
}
Some(chunk_producers)
}
#[cfg(not(feature = "nightly"))]
{
let _ = (epoch_id, epoch_info, shard_layout, prev_hash, height);
let _ = (epoch_id, epoch_info, shard_layout, prev_hash);
None
}
}

/// Test-only: seed `DBCol::ChunkProducers` for an already-recorded block,
/// mirroring `ChainStoreUpdate::save_chunk_producers_for_header` so nightly
/// tests exercise the DB-anchored path the harnesses otherwise never seed.
///
/// Must run after the block's `record_block_info` commit (reads the recorded
/// `BlockInfo`). Idempotent; EarlyKickout-gated (no-op on stable).
pub fn seed_chunk_producers_for_test(&self, block_hash: &CryptoHash) {
#[cfg(feature = "nightly")]
{
use near_primitives::utils::get_block_shard_id;
use near_store::DBCol;

let block_info =
self.get_block_info(block_hash).expect("block must be recorded before seeding");
// Gate on the anchor's own epoch, mirroring production's
// `get_epoch_protocol_version(header.epoch_id())`: a row exists iff the
// anchor's epoch has EarlyKickout, which is exactly when the aggregator's
// same-epoch read path applies. Gating on the epoch *after* the anchor
// would seed dead rows for last-of-epoch anchors across an activation edge.
let own_epoch_info =
self.get_epoch_info(block_info.epoch_id()).expect("anchor epoch info");
if !ProtocolFeature::EarlyKickout.enabled(own_epoch_info.protocol_version()) {
return;
}
// Sample from the epoch *after* the anchor, matching
// `save_chunk_producers_for_header`'s `get_epoch_id_from_prev_block`.
let epoch_id = if self
.is_next_block_epoch_start(block_hash)
.expect("block must be recorded before seeding")
{
self.get_next_epoch_id(block_hash).expect("next epoch id")
} else {
self.get_epoch_id(block_hash).expect("epoch id")
};
let epoch_info = self.get_epoch_info(&epoch_id).expect("epoch info");
let shard_layout = self.get_shard_layout(&epoch_id).expect("shard layout");
let height = block_info.height() + CHUNK_GRANDPARENT_ANCHOR_HEIGHT_OFFSET;
let mut store_update = self.store.store_ref().store_update();
for shard_id in shard_layout.shard_ids() {
if let Some(validator_id) =
epoch_info.sample_chunk_producer(&shard_layout, shard_id, height)
{
let validator_stake = epoch_info.get_validator(validator_id);
store_update.insert_ser(
DBCol::ChunkProducers,
&get_block_shard_id(block_hash, shard_id),
&validator_stake,
);
}
}
store_update.commit();
}
#[cfg(not(feature = "nightly"))]
let _ = block_hash;
}

/// Get the shard split to include in the block header, if any.
///
/// This method is expected to be called during the production of the last block of an epoch.
Expand Down
2 changes: 2 additions & 0 deletions chain/epoch-manager/src/shard_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ mod tests {
)
.unwrap()
.commit();
// Seed once: `record_blocks` funnels through here.
epoch_manager.seed_chunk_producers_for_test(&cur_h);
}

// Simulates block production over the given height range using the specified protocol version and block hashes.
Expand Down
5 changes: 5 additions & 0 deletions chain/epoch-manager/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ pub fn record_block_with_final_block_hash(
)
.unwrap()
.commit();
epoch_manager.seed_chunk_producers_for_test(&cur_h);
}

pub fn record_block(
Expand Down Expand Up @@ -403,6 +404,8 @@ pub fn record_block_with_version(
)
.unwrap()
.commit();
// Seed once: `record_block` and `record_blocks` funnel through here.
epoch_manager.seed_chunk_producers_for_test(&cur_h);
}

pub fn record_blocks<F>(
Expand Down Expand Up @@ -465,5 +468,7 @@ pub fn block_info(
}

pub fn record_with_block_info(epoch_manager: &mut EpochManager, block_info: BlockInfo) {
let hash = *block_info.hash();
epoch_manager.record_block_info(block_info, [0; 32]).unwrap().commit();
epoch_manager.seed_chunk_producers_for_test(&hash);
}
Loading
Loading