diff --git a/Cargo.lock b/Cargo.lock index 9ad9e595f77d6..14f64dee0e623 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4897,6 +4897,7 @@ dependencies = [ "cumulus-client-consensus-common", "cumulus-client-parachain-inherent", "cumulus-client-proof-size-recording", + "cumulus-client-unincluded-segment-store", "cumulus-primitives-aura", "cumulus-primitives-core", "cumulus-relay-chain-interface", @@ -4974,6 +4975,7 @@ dependencies = [ "sp-trie 29.0.0", "sp-version 29.0.0", "substrate-prometheus-endpoint 0.17.0", + "substrate-test-runtime", "tokio", "tracing", ] @@ -5146,6 +5148,22 @@ dependencies = [ "sp-trie 29.0.0", ] +[[package]] +name = "cumulus-client-unincluded-segment-store" +version = "0.1.0" +dependencies = [ + "cumulus-client-consensus-common", + "hex", + "parity-scale-codec", + "sc-client-api 28.0.0", + "sc-client-db", + "sp-blockchain 28.0.0", + "sp-runtime 31.0.1", + "sp-trie 29.0.0", + "substrate-test-runtime", + "tempfile", +] + [[package]] name = "cumulus-pallet-aura-ext" version = "0.7.0" @@ -5163,6 +5181,7 @@ dependencies = [ "scale-info", "sp-application-crypto 30.0.0", "sp-consensus-aura", + "sp-consensus-babe", "sp-core 28.0.0", "sp-io 30.0.0", "sp-keyring", diff --git a/Cargo.toml b/Cargo.toml index 81832c623461b..0e0f8697d3403 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ members = [ "cumulus/client/parachain-inherent", "cumulus/client/pov-recovery", "cumulus/client/proof-size-recording", + "cumulus/client/unincluded-segment-store", "cumulus/client/relay-chain-inprocess-interface", "cumulus/client/relay-chain-interface", "cumulus/client/relay-chain-minimal-node", @@ -770,6 +771,7 @@ cumulus-client-network = { path = "cumulus/client/network", default-features = f cumulus-client-parachain-inherent = { path = "cumulus/client/parachain-inherent", default-features = false } cumulus-client-pov-recovery = { path = "cumulus/client/pov-recovery", default-features = false } cumulus-client-proof-size-recording = { path = "cumulus/client/proof-size-recording", default-features = false } +cumulus-client-unincluded-segment-store = { path = "cumulus/client/unincluded-segment-store", default-features = false } cumulus-client-service = { path = "cumulus/client/service", default-features = false } cumulus-pallet-aura-ext = { path = "cumulus/pallets/aura-ext", default-features = false } cumulus-pallet-parachain-system = { path = "cumulus/pallets/parachain-system", default-features = false } diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 053577b0aeefc..0e388a37c7fd6 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -50,6 +50,7 @@ cumulus-client-collator = { workspace = true, default-features = true } cumulus-client-consensus-common = { workspace = true, default-features = true } cumulus-client-parachain-inherent = { workspace = true, default-features = true } cumulus-client-proof-size-recording = { workspace = true, default-features = true } +cumulus-client-unincluded-segment-store = { workspace = true, default-features = true } cumulus-primitives-aura = { workspace = true, default-features = true } cumulus-primitives-core = { workspace = true, default-features = true } cumulus-relay-chain-interface = { workspace = true, default-features = true } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index 88d68611a4de5..a6c484b66dfce 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -15,13 +15,13 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use super::CollatorMessage; +use super::{resubmission::ResubmissionRequest, CollatorMessage}; use crate::{ collator::{self as collator_util, BuildBlockAndImportParams, Collator, SlotClaim}, collators::{ check_validation_code_or_log, slot_based::{ - relay_chain_data_cache::RelayChainDataCache, + relay_chain_data_cache::{ArpEntry, ArpKey, RelayChainDataCache}, scheduling::SchedulingInfo, slot_timer::{SlotInfo, SlotTimer}, }, @@ -35,15 +35,20 @@ use cumulus_client_consensus_common::{ self as consensus_common, get_relay_slot, ParachainBlockImportMarker, }; use cumulus_client_proof_size_recording::prepare_proof_size_recording_aux_data; +use cumulus_client_unincluded_segment_store::{ + now_unix_ms, prepare_aux_data as prepare_unincluded_segment_aux_data, +}; use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot}; use cumulus_primitives_core::{ - BlockBundleInfo, ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem, - PersistedValidationData, RelayParentOffsetApi, SchedulingProof, SchedulingV3EnabledApi, - TargetBlockRate, + extract_relay_parent, BlockBundleInfo, ClaimQueueOffset, CoreInfo, CoreSelector, + CumulusDigestItem, PersistedValidationData, RelayParentOffsetApi, SchedulingProof, + SchedulingV3EnabledApi, TargetBlockRate, }; use cumulus_relay_chain_interface::RelayChainInterface; use futures::prelude::*; -use polkadot_primitives::{Block as RelayBlock, CoreIndex, Header as RelayHeader, Id as ParaId}; +use polkadot_primitives::{ + Block as RelayBlock, CoreIndex, Header as RelayHeader, Id as ParaId, SessionIndex, +}; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; use sc_consensus::BlockImport; use sc_consensus_aura::SlotDuration; @@ -110,6 +115,8 @@ pub struct BuilderTaskParams< pub collator_service: CS, /// Channel to send built blocks to the collation task. pub collator_sender: sc_utils::mpsc::TracingUnboundedSender>, + /// Channel to request unincluded-segment resubmission when the segment is full. + pub resubmission_sender: sc_utils::mpsc::TracingUnboundedSender, /// Slot duration of the relay chain. pub relay_chain_slot_duration: Duration, /// Offset all time operations by this duration. @@ -185,6 +192,7 @@ where proposer, collator_service, collator_sender, + resubmission_sender, code_hash_provider, relay_chain_slot_duration, para_backend, @@ -263,17 +271,26 @@ where slot_timer.set_offset_by_scheduling_version(v3_enabled, slot_offset); + // V3 resubmission phase 1: hydrate the prior segment in parallel with slot prep. + if v3_enabled { + let _ = resubmission_sender.unbounded_send(ResubmissionRequest::HydrateSegment { + scheduling_parent_header: scheduling_parent_header.clone(), + }); + } + let relay_parent_offset = para_client .runtime_api() .relay_parent_offset(para_best_hash) .unwrap_or_default(); - let mut max_relay_parent_session_age = 0; - if v3_enabled { - max_relay_parent_session_age = relay_client - .max_relay_parent_session_age(scheduling_parent_hash) + let max_relay_parent_session_age = if v3_enabled { + relay_chain_data_cache + .get_by_hash(scheduling_parent_hash) .await - .unwrap_or(0); - } + .map(|d| d.max_relay_parent_session_age) + .unwrap_or(0) + } else { + 0 + }; let Ok(Some(relay_parent_data)) = offset_relay_parent_find_descendants( &mut relay_chain_data_cache, scheduling_parent_header.clone(), @@ -287,27 +304,46 @@ where let relay_parent_header = relay_parent_data.relay_parent().clone(); let relay_parent_hash = relay_parent_header.hash(); - let Some(parent_search_result) = crate::collators::find_parent( - relay_parent_hash, - para_id, - &*para_backend, - &relay_client, - |parent| { - // We never want to build on any "middle block" that isn't the last block in a - // core. - // When the digest item doesn't exist, we are running in compatibility - // mode and all parents are valid. - CumulusDigestItem::is_last_block_in_core(parent.digest()).unwrap_or(true) - }, - ) - .await - else { - continue; + // V3: build on the parachain best head directly, after confirming it descends from the + // included head the relay chain reports at the scheduling parent. The scheduling parent + // already pins the included head, so the relay-parent-anchored fork search is + // unnecessary. V1/V2 keep that search. + let (included_header, initial_parent_header) = if v3_enabled { + match find_parent_v3::( + scheduling_parent_hash, + para_id, + para_best_hash, + &*para_backend, + &relay_client, + &mut relay_chain_data_cache, + ) + .await + { + Some(parents) => parents, + None => continue, + } + } else { + let Some(parent_search_result) = crate::collators::find_parent( + relay_parent_hash, + para_id, + &*para_backend, + &relay_client, + |parent| { + // We never want to build on any "middle block" that isn't the last block in + // a core. When the digest item doesn't exist, we are running in + // compatibility mode and all parents are valid. + CumulusDigestItem::is_last_block_in_core(parent.digest()).unwrap_or(true) + }, + ) + .await + else { + continue; + }; + + (parent_search_result.included_header, parent_search_result.best_parent_header) }; - let included_header = parent_search_result.included_header; - let initial_parent_hash = parent_search_result.best_parent_header.hash(); - let initial_parent_header = parent_search_result.best_parent_header; + let initial_parent_hash = initial_parent_header.hash(); let unincluded_segment_len = initial_parent_header.number().saturating_sub(*included_header.number()); @@ -484,6 +520,19 @@ where "Core configuration", ); + // V3 resubmission phase 2: sign + assemble for the first core. The deadline drives + // the collation task's flush of the prior segment if the first core builds nothing. + if v3_enabled { + let core_info = cores.core_info(); + let _ = resubmission_sender.unbounded_send(ResubmissionRequest::FinalizeSegment { + relay_parent_data: relay_parent_data.clone(), + core_index: cores.core_index(), + core_selector: core_info.selector, + claim_queue_offset: core_info.claim_queue_offset, + deadline: Instant::now() + slot_time.time_left(), + }); + } + let mut pov_parent_header = initial_parent_header; let mut pov_parent_hash = initial_parent_hash; let block_time = relay_chain_slot_duration / number_of_blocks; @@ -834,6 +883,12 @@ where ); } + let time_ms = now_unix_ms(); + prepare_unincluded_segment_aux_data::(parent_hash, time_ms, &built_block.proof) + .for_each(|(k, v)| { + import_block.auxiliary.push((k, Some(v))); + }); + if let Err(error) = collator.import_block(import_block).await { tracing::error!(target: LOG_TARGET, ?error, "Failed to import built block."); return Ok(None); @@ -903,13 +958,13 @@ where ); if let Err(err) = collator_sender.unbounded_send(CollatorMessage { - relay_parent: relay_parent_hash, scheduling_proof, + core_index, + relay_parent: relay_parent_hash, parent_header: pov_parent_header.clone(), blocks, proof, validation_code_hash, - core_index, validation_data, }) { tracing::error!(target: LOG_TARGET, ?err, "Unable to send block to collation task."); @@ -1017,6 +1072,138 @@ where ))) } +/// V3 parent selection. +/// +/// The scheduling parent pins the included head via the relay chain's persisted validation data, +/// so we anchor on it instead of running [`crate::collators::find_parent`]'s relay-anchored fork +/// search. Like V2, we prefer the pending-availability head as the search start when one exists. +/// The slot-pinned `para_best_hash` is the candidate tip; we then: +/// +/// 1. Pick `start` as pending-availability head if reported and locally known, else included. +/// 2. Confirm `start` is an ancestor of best. +/// 3. Walk the chain from `start` to best and find the longest prefix whose relay parents are still +/// in the relay chain's allowed list at `scheduling_parent` (via `ancestor_relay_parent_info`). +/// A block whose relay parent has aged out forces a fork off its parent. +/// 4. From that valid tip, walk back to the nearest last-in-core ancestor (the import path doesn't +/// gate "best" on bundle completion, so best can be a middle-of-bundle block). +/// +/// Returns `(included_header, parent_header)` or `None` if the included head is unknown locally, +/// a reported pending head is unusable, or `para_best_hash` is on a different fork. +async fn find_parent_v3( + scheduling_parent: RelayHash, + para_id: ParaId, + para_best_hash: Block::Hash, + para_backend: &B, + relay_client: &RClient, + relay_chain_data_cache: &mut RelayChainDataCache, +) -> Option<(Block::Header, Block::Header)> +where + Block: BlockT, + B: sc_client_api::Backend, + RClient: RelayChainInterface + Clone + 'static, +{ + let cumulus_client_consensus_common::ParentSearchStart { + included_header, + start_hash, + start_header, + } = cumulus_client_consensus_common::pick_parent_search_start::( + relay_client, + para_backend, + para_id, + scheduling_parent, + ) + .await + .ok() + .flatten()?; + + if para_best_hash != start_hash { + let route = + sp_blockchain::tree_route(para_backend.blockchain(), start_hash, para_best_hash) + .ok()?; + if !route.retracted().is_empty() { + tracing::debug!( + target: LOG_TARGET, + ?start_hash, + ?para_best_hash, + "Para best head is not a descendant of the search start; skipping slot.", + ); + return None; + } + } + + // Walk best→start collecting the chain, then reverse to oldest-first. + let mut chain: Vec = Vec::new(); + let mut cur_hash = para_best_hash; + while cur_hash != start_hash { + let h = para_backend.blockchain().header(cur_hash).ok().flatten()?; + cur_hash = *h.parent_hash(); + chain.push(h); + } + chain.reverse(); + + let (current_session, max_session_age) = { + let data = relay_chain_data_cache.get_by_hash(scheduling_parent).await.ok()?; + (data.session_index, data.max_relay_parent_session_age) + }; + + // Longest valid prefix from start. Default to start if even the first block is invalid. + let mut valid_tip_header = start_header; + for header in &chain { + let Some(relay_parent) = extract_relay_parent(header.digest()) else { break }; + if !is_relay_parent_allowed( + relay_chain_data_cache, + scheduling_parent, + current_session, + max_session_age, + relay_parent, + ) + .await + { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?header.hash(), + ?relay_parent, + "Unincluded parablock's relay parent is no longer allowed; forking off its parent.", + ); + break; + } + valid_tip_header = header.clone(); + } + + let mut parent_header = valid_tip_header; + while !CumulusDigestItem::is_last_block_in_core(parent_header.digest()).unwrap_or(true) { + parent_header = + para_backend.blockchain().header(*parent_header.parent_hash()).ok().flatten()?; + } + + Some((included_header, parent_header)) +} + +/// Check whether `relay_parent` is in `at`'s allowed relay parents for some session within +/// `[current_session - max_session_age, current_session]`. Iterates newest-session first so the +/// common case (relay parent in the current session) is a single cache lookup. +async fn is_relay_parent_allowed( + relay_chain_data_cache: &mut RelayChainDataCache, + at: RelayHash, + current_session: SessionIndex, + max_session_age: u32, + relay_parent: RelayHash, +) -> bool +where + RClient: RelayChainInterface + Clone + 'static, +{ + let oldest_session = current_session.saturating_sub(max_session_age); + for session_index in (oldest_session..=current_session).rev() { + let key = ArpKey { at, session_index, relay_parent }; + match relay_chain_data_cache.ancestor_relay_parent_info(key).await { + Ok(ArpEntry { info: Some(_) }) => return true, + Ok(ArpEntry { info: None }) => continue, + Err(()) => return false, + } + } + false +} + /// Return value of [`determine_cores`]. pub struct Cores { selector: CoreSelector, diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs index 399beae88f2d8..c69bc4b866676 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_import.rs @@ -17,7 +17,11 @@ use crate::LOG_TARGET; use codec::{Decode, Encode}; +use cumulus_client_consensus_common::old_finalized_hash; use cumulus_client_proof_size_recording::prepare_proof_size_recording_aux_data; +use cumulus_client_unincluded_segment_store::{ + now_unix_ms, prepare_aux_data as prepare_unincluded_segment_aux_data, +}; use cumulus_primitives_core::{BlockBundleInfo, CoreInfo, CumulusDigestItem, RelayBlockIdentifier}; use futures::{stream::FusedStream, StreamExt}; use sc_client_api::{ @@ -100,14 +104,11 @@ where { let client_for_closure = client.clone(); let on_finality = move |notification: &FinalityNotification| -> AuxDataOperations { - // The old finalized block is the parent of the first block in the tree route, - // or the parent of the finalized block if the tree route is empty. - let old_finalized_hash = notification - .tree_route - .first() - .and_then(|hash| client_for_closure.header(*hash).ok().flatten()) - .map(|h| *h.parent_hash()) - .unwrap_or_else(|| *notification.header.parent_hash()); + let old_finalized = old_finalized_hash::<_, Block>( + &*client_for_closure, + ¬ification.tree_route, + *notification.header.parent_hash(), + ); notification .stale_blocks @@ -125,7 +126,7 @@ where .copied() .map(|hash| (ignored_nodes_key(hash), None)), ) - .chain(std::iter::once((ignored_nodes_key(old_finalized_hash), None))) + .chain(std::iter::once((ignored_nodes_key(old_finalized), None))) .collect() }; @@ -212,6 +213,7 @@ impl SlotBasedBlockImport { fn execute_block_and_collect_storage_proof( &self, params: &mut sc_consensus::BlockImportParams, + time_ms: u64, ) -> Result<(), sp_consensus::Error> where Client: ProvideRuntimeApi @@ -315,6 +317,12 @@ impl SlotBasedBlockImport { }); } + prepare_unincluded_segment_aux_data::(block_hash, time_ms, &storage_proof).for_each( + |(k, v)| { + params.auxiliary.push((k, Some(v))); + }, + ); + params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(gen_storage_changes)); @@ -352,11 +360,13 @@ where &self, mut params: sc_consensus::BlockImportParams, ) -> Result { + let time_ms = now_unix_ms(); + if !(params.origin == BlockOrigin::Own || params.with_state() || params.state_action.skip_execution_checks()) { - self.execute_block_and_collect_storage_proof(&mut params)?; + self.execute_block_and_collect_storage_proof(&mut params, time_ms)?; } self.inner.import_block(params).await.map_err(Into::into) diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs index 2ddff00c40421..f54777cc9dfde 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -15,12 +15,15 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; +use codec::Codec; use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::ValidationCodeHashProvider; +use cumulus_client_unincluded_segment_store::UnincludedSegmentStore; use cumulus_relay_chain_interface::RelayChainInterface; -use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams}; +use polkadot_node_primitives::{MaybeCompressedPoV, SegmentCollation, SubmitSegmentParams}; use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{CollatorPair, Id as ParaId}; @@ -29,15 +32,28 @@ use cumulus_primitives_core::relay_chain::BlockId; use futures::prelude::*; use crate::export_pov_to_path; +use sc_client_api::{backend::AuxStore, Backend}; +use sc_network_types::PeerId; use sc_utils::mpsc::TracingUnboundedReceiver; -use sp_runtime::traits::{Block as BlockT, Header}; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_consensus_aura::AuraApi; +use sp_core::crypto::Pair; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header, Member}; -use super::CollatorMessage; +use super::{ + resubmission::{ResubmissionManager, ResubmissionRequest}, + unincluded_segment::{HydratedSegment, PriorSegment, Segment}, + CollatorMessage, +}; +use std::time::Instant; const LOG_TARGET: &str = "aura::cumulus::collation_task"; /// Parameters for the collation task. -pub struct Params { +pub struct Params { /// A handle to the relay-chain client. pub relay_client: RClient, /// The collator key used to sign collations before submitting to validators. @@ -54,15 +70,26 @@ pub struct Params { pub block_import_handle: super::SlotBasedBlockImportHandle, /// When set, the collator will export every produced `POV` to this folder. pub export_pov: Option, + /// Parachain backend — used by the resubmission manager to read unincluded block bodies. + pub para_backend: Arc, + /// Parachain client — used by the resubmission manager for Aura authorities + slot duration. + pub para_client: Arc, + /// Keystore used by the resubmission manager to sign scheduling info. + pub keystore: KeystorePtr, + /// Collator network peer id, recorded in resubmission signed payloads. + pub collator_peer_id: PeerId, + /// Validation code hash provider — used by the resubmission manager. + pub code_hash_provider: CHP, + /// Per-block storage-proof store — used by the resubmission manager. + pub store: UnincludedSegmentStore, + /// Receiver channel for resubmission requests from the block builder task. + pub resubmission_receiver: TracingUnboundedReceiver, } -/// Asynchronously executes the collation task for a parachain. -/// -/// This function initializes the collator subsystems necessary for producing and submitting -/// collations to the relay chain. It listens for new best relay chain block notifications and -/// handles collator messages. If our parachain is scheduled on a core and we have a candidate, -/// the task will build a collation and send it to the relay chain. -pub async fn run_collation_task( +/// Run the collation task. Consumes [`CollatorMessage`]s from the builder and +/// [`ResubmissionRequest`]s driving a [`ResubmissionManager`]; both converge on +/// [`handle_segment`]. +pub async fn run_collation_task( Params { relay_client, collator_key, @@ -72,11 +99,25 @@ pub async fn run_collation_task( mut collator_receiver, mut block_import_handle, export_pov, - }: Params, + para_backend, + para_client, + keystore, + collator_peer_id, + code_hash_provider, + store, + mut resubmission_receiver, + }: Params, ) where Block: BlockT, - CS: CollatorServiceInterface + Send + Sync + 'static, + B: Backend + 'static, + Client: ProvideRuntimeApi + HeaderBackend + AuxStore + Send + Sync + 'static, + Client::Api: AuraApi + ApiExt, RClient: RelayChainInterface + Clone + 'static, + CHP: ValidationCodeHashProvider + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + P: Pair + Send + Sync + 'static, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, { let Ok(mut overseer_handle) = relay_client.overseer_handle() else { tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); @@ -91,126 +132,193 @@ pub async fn run_collation_task( ) .await; + let mut resubmission_manager = ResubmissionManager::::new( + para_id, + keystore, + collator_peer_id, + para_backend, + para_client, + relay_client.clone(), + code_hash_provider, + store, + ); + + let mut hydrated_prior: Option> = None; + let mut pending_first_core: Option> = None; + loop { + // Sleeps until the held prior segment's deadline; never resolves when nothing is held. + let next_deadline = pending_first_core.as_ref().map(|p| p.deadline); + let deadline_sleep = async move { + match next_deadline { + Some(d) => tokio::time::sleep(d.saturating_duration_since(Instant::now())).await, + None => futures::future::pending::<()>().await, + } + }; + futures::pin_mut!(deadline_sleep); + futures::select! { collator_message = collator_receiver.next() => { - let Some(message) = collator_message else { - return; + let Some(msg) = collator_message else { return }; + + let segment = match pending_first_core.take_if(|p| p.core_index == msg.core_index) { + Some(prior) => prior.merge_first_core(msg), + None => Segment::singleton(msg), }; + handle_segment(segment, &collator_service, &mut overseer_handle, relay_client.clone(), export_pov.clone()).await; + }, + resubmission_request = resubmission_receiver.next() => { + let Some(request) = resubmission_request else { return }; - handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await; + match request { + ResubmissionRequest::HydrateSegment { scheduling_parent_header } => { + hydrated_prior = + resubmission_manager.hydrate(scheduling_parent_header).await; + }, + ResubmissionRequest::FinalizeSegment { + relay_parent_data, + core_index, + core_selector, + claim_queue_offset, + deadline, + } => + if let Some(hydrated) = hydrated_prior.take() { + pending_first_core = resubmission_manager + .finalize( + hydrated, + relay_parent_data, + core_index, + core_selector, + claim_queue_offset, + deadline, + ) + .await; + }, + } + }, + _ = deadline_sleep.fuse() => { + // First core built nothing this slot: submit prior alone. + if let Some(prior) = pending_first_core.take() { + handle_segment(prior.into_segment(), &collator_service, &mut overseer_handle, relay_client.clone(), export_pov.clone()).await; + } }, block_import_msg = block_import_handle.next().fuse() => { - // TODO: Implement me. - // Issue: https://github.com/paritytech/polkadot-sdk/issues/6495 + // TODO: https://github.com/paritytech/polkadot-sdk/issues/6495 let _ = block_import_msg; } } } } -/// Handle an incoming collation message from the block builder task. -/// This builds the collation from the [`CollatorMessage`] and submits it to -/// the collation-generation subsystem of the relay chain. -async fn handle_collation_message( - message: CollatorMessage, +/// Build a [`SegmentCollation`] per entry and forward them as one `SubmitSegment` message. +/// Per-entry failures (collation build / session lookup) are skipped, not fatal. +async fn handle_segment( + segment: Segment, collator_service: &impl CollatorServiceInterface, overseer_handle: &mut OverseerHandle, relay_client: RClient, export_pov: Option, ) { - let CollatorMessage { - scheduling_proof, - parent_header, - blocks, - proof, - validation_code_hash, - relay_parent, - core_index, - validation_data, - } = message; - - // Derive scheduling_parent from the proof (the ISP header's hash is used when the - // header chain is empty — that's the case with `relay_parent_offset = 0`). + let Segment { scheduling_proof, core_index, entries } = segment; + + // Entries may have different relay parents; `None` means V2 descriptors throughout. let scheduling_parent = scheduling_proof.as_ref().map(|p| p.scheduling_parent()); - let (collation, block_data) = match collator_service.build_multi_block_collation( - &parent_header, - blocks, - proof, - scheduling_proof, - ) { - Some(collation) => collation, - None => { - tracing::warn!(target: LOG_TARGET, ?core_index, "Unable to build collation."); - return; - }, - }; - block_data.log_size_info(); - - if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { - if let Some(pov_path) = export_pov { - if let Ok(Some(relay_parent_header)) = - relay_client.header(BlockId::Hash(relay_parent)).await - { - if let Some(header) = block_data.blocks().first().map(|b| b.header()) { - export_pov_to_path::( - pov_path.clone(), - pov.clone(), - header.hash(), - *header.number(), - parent_header.clone(), - relay_parent_header.state_root, - relay_parent_header.number, - validation_data.max_pov_size, - ); + let mut collations = Vec::with_capacity(entries.len()); + for entry in entries { + let relay_parent = entry.relay_parent; + let validation_code_hash = entry.validation_code_hash; + let validation_data = entry.validation_data; + let parent_header = entry.parent_header; + + let (collation, block_data) = match collator_service.build_multi_block_collation( + &parent_header, + entry.blocks, + entry.proof, + scheduling_proof.clone(), + ) { + Some(collation) => collation, + None => { + tracing::warn!(target: LOG_TARGET, ?core_index, ?relay_parent, "Unable to build collation for segment entry."); + continue; + }, + }; + + block_data.log_size_info(); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + if let Some(pov_path) = export_pov.clone() { + if let Ok(Some(relay_parent_header)) = + relay_client.header(BlockId::Hash(relay_parent)).await + { + if let Some(header) = block_data.blocks().first().map(|b| b.header()) { + export_pov_to_path::( + pov_path, + pov.clone(), + header.hash(), + *header.number(), + parent_header.clone(), + relay_parent_header.state_root, + relay_parent_header.number, + validation_data.max_pov_size, + ); + } + } else { + tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}"); } - } else { - tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}"); } + + tracing::info!( + target: LOG_TARGET, + block_numbers = ?block_data.blocks().iter().map(|b| *b.header().number()).collect::>(), + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); } - tracing::info!( + let session_index = match relay_client.session_index_for_child(relay_parent).await { + Ok(session_index) => session_index, + Err(err) => { + tracing::error!( + target: LOG_TARGET, + ?err, + ?relay_parent, + "Failed to fetch session index for segment entry.", + ); + continue; + }, + }; + + tracing::debug!( target: LOG_TARGET, + ?core_index, + ?relay_parent, block_numbers = ?block_data.blocks().iter().map(|b| *b.header().number()).collect::>(), - "Compressed PoV size: {}kb", - pov.block_data.0.len() as f64 / 1024f64, + "Adding entry to segment for core.", ); - } - let session_index = match relay_client.session_index_for_child(relay_parent).await { - Ok(session_index) => session_index, - Err(err) => { - tracing::error!( - target: LOG_TARGET, - ?err, - ?relay_parent, - "Failed to fetch session index." - ); - return; - }, - }; + collations.push(SegmentCollation { + relay_parent, + collation, + validation_code_hash, + result_sender: None, + session_index, + validation_data, + }); + } - tracing::debug!( - target: LOG_TARGET, - ?core_index, - block_numbers = ?block_data.blocks().iter().map(|b| *b.header().number()).collect::>(), - "Submitting collation for core.", - ); + if collations.is_empty() { + return; + } overseer_handle .send_msg( - CollationGenerationMessage::SubmitCollation(SubmitCollationParams { - relay_parent, - collation, - validation_code_hash, - core_index, - result_sender: None, + CollationGenerationMessage::SubmitSegment(SubmitSegmentParams { scheduling_parent, - session_index, - validation_data, + core_index, + collations, }), - "SubmitCollation", + "SubmitSegment", ) .await; } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs index f309b6751eba4..32c9d1e7c6335 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -74,6 +74,7 @@ use codec::Codec; use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; use cumulus_client_proof_size_recording::register_proof_size_recording_cleanup; +use cumulus_client_unincluded_segment_store::UnincludedSegmentStore; use cumulus_primitives_aura::AuraUnincludedSegmentApi; use cumulus_primitives_core::{ KeyToIncludeInRelayProof, RelayParentOffsetApi, SchedulingProof, SchedulingV3EnabledApi, @@ -107,8 +108,10 @@ mod block_builder_task; mod block_import; mod collation_task; mod relay_chain_data_cache; +mod resubmission; mod scheduling; mod slot_timer; +mod unincluded_segment; #[cfg(test)] mod tests; @@ -188,7 +191,7 @@ pub fn run + ParachainBlockImportMarker + Send + Sync + 'static, Proposer: Environment + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + Clone + 'static, - CHP: consensus_common::ValidationCodeHashProvider + Send + Sync + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Clone + Send + Sync + 'static, P: Pair + Send + Sync + 'static, P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, @@ -219,7 +222,13 @@ pub fn run::new(para_client.clone()); + unincluded_segment_store.register_cleanup(); + let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100); + let (resubmission_tx, resubmission_rx) = tracing_unbounded("mpsc_builder_to_resubmission", 100); let collator_task_params = collation_task::Params { relay_client: relay_client.clone(), collator_key, @@ -229,9 +238,16 @@ pub fn run(collator_task_params); + let collation_task_fut = run_collation_task::(collator_task_params); let block_builder_params = block_builder_task::BuilderTaskParams { create_inherent_data_providers, @@ -246,6 +262,7 @@ pub fn run { - /// The hash of the relay chain block that provides the context for the parachain block. - pub relay_parent: RelayHash, - /// V3 scheduling proof. None for V1/V2 candidates. pub scheduling_proof: Option, - /// The header of the parent block. + pub core_index: CoreIndex, + pub relay_parent: RelayHash, pub parent_header: Block::Header, - /// The built blocks. pub blocks: Vec, - /// The storage proof that was collected while building all the blocks. pub proof: StorageProof, - /// The validation code hash at the parent block. pub validation_code_hash: ValidationCodeHash, - /// Core index that this block should be submitted on - pub core_index: CoreIndex, - /// The persisted validation data for this collation. pub validation_data: PersistedValidationData, } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/relay_chain_data_cache.rs b/cumulus/client/consensus/aura/src/collators/slot_based/relay_chain_data_cache.rs index b2dbdb9fa7d51..f0dae7ea2b26a 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/relay_chain_data_cache.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/relay_chain_data_cache.rs @@ -18,11 +18,11 @@ //! Utility for caching [`RelayChainData`] for different relay blocks. use crate::collators::claim_queue_at; -use cumulus_relay_chain_interface::RelayChainInterface; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayParentInfo}; use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot; use polkadot_primitives::{ - node_features::FeatureIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, - NodeFeatures, OccupiedCoreAssumption, + node_features::FeatureIndex, BlockNumber, Hash as RelayHash, Header as RelayHeader, + Id as ParaId, NodeFeatures, OccupiedCoreAssumption, SessionIndex, }; use sp_runtime::generic::BlockId; @@ -37,6 +37,10 @@ pub struct RelayChainData { pub max_pov_size: u32, /// The node features at the relay parent. pub node_features: NodeFeatures, + /// Maximum number of sessions back a relay parent may be from this block. + pub max_relay_parent_session_age: u32, + /// Session index expected at the child of this relay block. + pub session_index: SessionIndex, } impl RelayChainData { @@ -45,12 +49,29 @@ impl RelayChainData { } } +/// Key into the `ancestor_relay_parent_info` cache: the relay block we query at, the session, and +/// the candidate relay parent. +#[derive(Clone, Hash, Eq, PartialEq, Debug)] +pub struct ArpKey { + pub at: RelayHash, + pub session_index: SessionIndex, + pub relay_parent: RelayHash, +} + +/// Cached result of an `ancestor_relay_parent_info` lookup. `info` is `Some` if `relay_parent` +/// was in `at`'s allowed list for `session_index`, `None` otherwise. +#[derive(Clone, Debug)] +pub struct ArpEntry { + pub info: Option>, +} + /// Simple helper to fetch relay chain data and cache it based on the current relay chain best block /// hash. pub struct RelayChainDataCache { relay_client: RI, para_id: ParaId, cached_data: schnellru::LruMap, + cached_arp: schnellru::LruMap, } impl RelayChainDataCache @@ -63,7 +84,32 @@ where para_id, // 50 cached relay chain blocks should be more than enough. cached_data: schnellru::LruMap::new(schnellru::ByLength::new(50)), + cached_arp: schnellru::LruMap::new(schnellru::ByLength::new(500)), + } + } + + /// Cached lookup of [`RelayChainInterface::ancestor_relay_parent_info`]. + pub async fn ancestor_relay_parent_info(&mut self, key: ArpKey) -> Result { + if let Some(cached) = self.cached_arp.get(&key) { + return Ok(cached.clone()); } + + let info = self + .relay_client + .ancestor_relay_parent_info(key.at, key.session_index, key.relay_parent) + .await + .map_err(|err| { + tracing::debug!( + target: crate::LOG_TARGET, + ?err, + ?key, + "ancestor_relay_parent_info call failed.", + ); + })?; + + let entry = ArpEntry { info }; + self.cached_arp.insert(key, entry.clone()); + Ok(entry) } /// Fetch required [`RelayChainData`] from the relay chain. @@ -151,7 +197,41 @@ where }, }; - Ok(RelayChainData { relay_header, claim_queue, max_pov_size, node_features }) + let max_relay_parent_session_age = self + .relay_client + .max_relay_parent_session_age(relay_hash) + .await + .unwrap_or_else(|err| { + tracing::debug!( + target: crate::LOG_TARGET, + ?relay_hash, + ?err, + "Failed to fetch max_relay_parent_session_age; defaulting to 0.", + ); + 0 + }); + + let session_index = match self.relay_client.session_index_for_child(relay_hash).await { + Ok(session) => session, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_hash, + ?err, + "Unable to fetch session index for child." + ); + return Err(()); + }, + }; + + Ok(RelayChainData { + relay_header, + claim_queue, + max_pov_size, + node_features, + max_relay_parent_session_age, + session_index, + }) } #[cfg(test)] diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/resubmission.rs b/cumulus/client/consensus/aura/src/collators/slot_based/resubmission.rs new file mode 100644 index 0000000000000..5baf2bd5848b4 --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/resubmission.rs @@ -0,0 +1,390 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Unincluded-segment resubmission for the slot-based collator. +//! +//! Every V3 slot the collator resubmits its prior unincluded segment on the first core (prepended +//! to whatever new blocks it builds there). The [`ResubmissionManager`] runs in two phases so the +//! walk + load (independent of the relay parent / first core) overlaps slot preparation: +//! +//! 1. [`hydrate`](ResubmissionManager::hydrate) — fired once the scheduling parent is known. +//! 2. [`finalize`](ResubmissionManager::finalize) — fired once the relay parent and first core are +//! known. The signed scheduling proof must verify against the on-chain verifier in +//! . + +use super::{ + relay_chain_data_cache::RelayChainDataCache, + unincluded_segment::{HydratedSegment, PriorSegment, SegmentEntry}, +}; +use crate::collators::RelayParentData; +use codec::{Codec, Encode}; +use cumulus_client_consensus_common::{get_relay_slot, ValidationCodeHashProvider}; +use cumulus_client_unincluded_segment_store::UnincludedSegmentStore; +use cumulus_primitives_core::{ + extract_relay_parent, ClaimQueueOffset, PersistedValidationData, SchedulingInfoPayload, + SchedulingProof, SignedSchedulingInfo, +}; +use cumulus_relay_chain_interface::RelayChainInterface; +use polkadot_primitives::{ + ApprovedPeerId, CoreIndex, CoreSelector, Header as RelayHeader, Id as ParaId, +}; +use sc_client_api::{backend::AuxStore, Backend}; +use sc_consensus_aura::standalone as aura_internal; +use sc_network_types::PeerId; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_application_crypto::{AppCrypto, AppPublic}; +use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; +use sp_consensus_aura::{AuraApi, Slot}; +use sp_core::{crypto::Pair, ByteArray}; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; +use std::{marker::PhantomData, sync::Arc, time::Instant}; + +const LOG_TARGET: &str = "consensus::slot_based::resubmission"; + +/// Polkadot/Kusama relay chain slot duration in milliseconds. Mirrors the constant the verifier +/// uses to map a relay slot to a parachain slot. +const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000; + +/// Block-builder → resubmission request. Two phases on one channel. +pub enum ResubmissionRequest { + /// Phase 1: walk + load the prior unincluded segment at the scheduling parent. + HydrateSegment { + scheduling_parent_header: RelayHeader, + }, + /// Phase 2: sign + assemble the held hydrated segment for the first core. + FinalizeSegment { + /// Internal scheduling parent (= relay parent the builder authors against) plus its + /// descendants up to the scheduling parent. + relay_parent_data: RelayParentData, + core_index: CoreIndex, + /// Overrides the blocks' UMP core selection on resubmission. + core_selector: CoreSelector, + claim_queue_offset: ClaimQueueOffset, + /// Submission deadline. Drives both signing abort and the collation task's flush of an + /// unmerged prior segment. + deadline: Instant, + }, +} + +/// Owns the persistent state needed to turn the prior unincluded segment into a resubmitted, +/// signed segment. Constructed once and driven by the collation task across the two phases +/// ([`hydrate`](Self::hydrate) then [`finalize`](Self::finalize)) of each slot's +/// [`ResubmissionRequest`]s. +pub struct ResubmissionManager { + para_id: ParaId, + keystore: KeystorePtr, + collator_peer_id: PeerId, + para_backend: Arc, + para_client: Arc, + relay_client: RClient, + code_hash_provider: CHP, + store: UnincludedSegmentStore, + relay_chain_data_cache: RelayChainDataCache, + _phantom: PhantomData P>, +} + +impl ResubmissionManager +where + Block: BlockT, + B: Backend, + Client: ProvideRuntimeApi + HeaderBackend + AuxStore, + Client::Api: AuraApi + ApiExt, + RClient: RelayChainInterface + Clone + 'static, + CHP: ValidationCodeHashProvider, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + /// Create a new manager. `relay_client` seeds the internal relay-chain data cache and is also + /// used directly by the unincluded-segment walker. + #[allow(clippy::too_many_arguments)] + pub fn new( + para_id: ParaId, + keystore: KeystorePtr, + collator_peer_id: PeerId, + para_backend: Arc, + para_client: Arc, + relay_client: RClient, + code_hash_provider: CHP, + store: UnincludedSegmentStore, + ) -> Self { + let relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id); + Self { + para_id, + keystore, + collator_peer_id, + para_backend, + para_client, + relay_client, + code_hash_provider, + store, + relay_chain_data_cache, + _phantom: PhantomData, + } + } + + /// Walk the prior unincluded segment at `scheduling_parent_header` and load each block's body + /// + stored proof. Returns `None` for an empty segment or if any block fails to hydrate. + pub async fn hydrate( + &mut self, + scheduling_parent_header: RelayHeader, + ) -> Option> { + let segment = super::unincluded_segment::reconstruct_unincluded_segment::( + scheduling_parent_header, + self.para_id, + &*self.para_backend, + &self.relay_client, + &mut self.relay_chain_data_cache, + ) + .await; + + // Anchor on the walked segment's tip — re-reading the client best could race a concurrent + // import onto a different fork. + let authority_anchor = segment.last()?.hash(); + + let mut entries = Vec::with_capacity(segment.len()); + for header in segment { + match build_entry::( + header, + &*self.para_backend, + &self.code_hash_provider, + &self.store, + &mut self.relay_chain_data_cache, + ) + .await + { + Some(entry) => entries.push(entry), + None => { + tracing::warn!( + target: LOG_TARGET, + "Aborting resubmission: failed to hydrate an unincluded block.", + ); + return None; + }, + } + } + + Some(HydratedSegment { entries, authority_anchor }) + } + + /// Build the `header_chain`, sign the scheduling proof for the first core, and return a + /// [`PriorSegment`] carrying the held entries and the original `deadline`. Returns `None` if + /// the deadline has passed or the local keystore does not control the eligible author. + pub async fn finalize( + &mut self, + hydrated: HydratedSegment, + mut relay_parent_data: RelayParentData, + core_index: CoreIndex, + core_selector: CoreSelector, + claim_queue_offset: ClaimQueueOffset, + deadline: Instant, + ) -> Option> { + if Instant::now() >= deadline { + tracing::debug!(target: LOG_TARGET, "Dropping resubmission finalize: deadline already past."); + return None; + } + + let internal_scheduling_parent_header = relay_parent_data.relay_parent().clone(); + let internal_scheduling_parent = internal_scheduling_parent_header.hash(); + // Descendants are oldest→newest; `header_chain` is newest-first. + let header_chain: Vec = + relay_parent_data.take_descendants().into_iter().rev().collect(); + + let payload = SchedulingInfoPayload::new( + core_selector, + claim_queue_offset.0, + peer_id_to_approved(self.collator_peer_id), + internal_scheduling_parent, + ); + let signed_scheduling_info = match sign_scheduling_info::( + payload, + &internal_scheduling_parent_header, + &self.para_client, + hydrated.authority_anchor, + &self.keystore, + ) + .await + { + Some(signed) => signed, + None => { + tracing::debug!( + target: LOG_TARGET, + ?internal_scheduling_parent, + "Skipping resubmission: not the eligible author for this slot (no local key)." + ); + return None; + }, + }; + + let scheduling_proof = SchedulingProof { + header_chain, + internal_scheduling_parent_header, + signed_scheduling_info: Some(signed_scheduling_info), + }; + + tracing::debug!( + target: LOG_TARGET, + ?core_index, + ?internal_scheduling_parent, + entries = hydrated.entries.len(), + "Finalized resubmitted prior segment for the first core.", + ); + + Some(PriorSegment { scheduling_proof, core_index, deadline, entries: hydrated.entries }) + } +} + +/// Rebuild one [`SegmentEntry`] for an unincluded parablock by combining its header, body, and +/// the storage proof captured at import time. +async fn build_entry( + header: Block::Header, + para_backend: &B, + code_hash_provider: &CHP, + store: &UnincludedSegmentStore, + relay_chain_data_cache: &mut RelayChainDataCache, +) -> Option> +where + Block: BlockT, + B: Backend, + Client: AuxStore, + RClient: RelayChainInterface + Clone + 'static, + CHP: ValidationCodeHashProvider, +{ + let block_hash = header.hash(); + let parent_hash = *header.parent_hash(); + + let relay_parent = extract_relay_parent(header.digest())?; + + let parent_header = para_backend.blockchain().header(parent_hash).ok().flatten()?; + let body = para_backend.blockchain().body(block_hash).ok().flatten()?; + let block = Block::new(header, body); + + let stored = store.load(block_hash).ok().flatten()?; + + let relay_data = relay_chain_data_cache.get_by_hash(relay_parent).await.ok()?; + let validation_data = PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_data.relay_header.number(), + relay_parent_storage_root: *relay_data.relay_header.state_root(), + max_pov_size: relay_data.max_pov_size, + }; + + let validation_code_hash = code_hash_provider.code_hash_at(parent_hash)?; + + Some(SegmentEntry { + relay_parent, + parent_header, + blocks: vec![block], + proof: stored.proof, + validation_code_hash, + validation_data, + }) +} + +/// Sign `payload` as the eligible Aura author for the slot derived from the BABE pre-digest of +/// `internal_scheduling_parent_header`. Returns `None` when no BABE slot is found, the slot +/// duration is unavailable, or the local keystore does not control the eligible author. +/// +/// Slot/author derivation mirrors the on-chain verifier in +/// so the produced signature verifies. +async fn sign_scheduling_info( + payload: SchedulingInfoPayload, + internal_scheduling_parent_header: &RelayHeader, + para_client: &Arc, + authority_anchor: Block::Hash, + keystore: &KeystorePtr, +) -> Option +where + Block: BlockT, + Client: ProvideRuntimeApi + HeaderBackend, + Client::Api: AuraApi + ApiExt, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + let relay_slot: Slot = get_relay_slot(internal_scheduling_parent_header)?; + + // Authorities and slot duration are read at `authority_anchor` (the walked segment's tip), + // which is on the same fork as the blocks we are resubmitting. + let para_slot_duration = crate::slot_duration_at(&**para_client, authority_anchor).ok()?; + let para_slot_duration_ms = para_slot_duration.as_millis(); + let para_slot = relay_slot_to_para_slot(relay_slot, para_slot_duration_ms)?; + + let mut runtime_api = para_client.runtime_api(); + runtime_api.set_call_context(sp_core::traits::CallContext::Onchain { import: false }); + let authorities = runtime_api.authorities(authority_anchor).ok()?; + + // claim_slot returns the author public key only if we control its key in the keystore. + let author_pub = + aura_internal::claim_slot::

(Slot::from(para_slot), &authorities, keystore).await?; + + let signature = keystore + .sign_with( + <

::Public as AppCrypto>::ID, + <

::Public as AppCrypto>::CRYPTO_ID, + author_pub.as_slice(), + &payload.encode(), + ) + .ok() + .flatten()?; + let signature: [u8; 64] = signature.try_into().ok()?; + + Some(SignedSchedulingInfo { payload, signature }) +} + +/// Map a relay-chain slot to the parachain slot used for eligible-author selection. Mirrors the +/// formula used by the on-chain verifier in +/// . Returns `None` for a zero slot +/// duration so the caller skips signing rather than dividing by zero. +fn relay_slot_to_para_slot(relay_slot: Slot, para_slot_duration_ms: u64) -> Option { + if para_slot_duration_ms == 0 { + return None; + } + u64::from(relay_slot) + .saturating_mul(RELAY_CHAIN_SLOT_DURATION_MILLIS) + .checked_div(para_slot_duration_ms) +} + +/// Convert a libp2p [`PeerId`] into the on-chain [`ApprovedPeerId`] (a 64-byte-bounded vector). +fn peer_id_to_approved(peer_id: PeerId) -> ApprovedPeerId { + ApprovedPeerId::truncate_from(peer_id.to_bytes()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn relay_slot_to_para_slot_matches_verifier_formula() { + // 6s relay slot, 2s para slots: three para slots per relay slot. + assert_eq!(relay_slot_to_para_slot(Slot::from(10), 2_000), Some(30)); + // Equal durations: 1:1 mapping. + assert_eq!(relay_slot_to_para_slot(Slot::from(7), 6_000), Some(7)); + // 6s para slot duration with a 12s "double" duration: floor division. + assert_eq!(relay_slot_to_para_slot(Slot::from(5), 12_000), Some(2)); + // Zero duration is guarded rather than panicking. + assert_eq!(relay_slot_to_para_slot(Slot::from(1), 0), None); + } + + #[test] + fn peer_id_round_trips_through_approved_peer_id() { + let peer_id = PeerId::random(); + // PeerId encodings are ~38 bytes, well under the 64-byte bound, so no truncation. + assert_eq!(peer_id_to_approved(peer_id).into_inner(), peer_id.to_bytes()); + } +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs b/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs index cb1e3cf9985dc..9639d9c0fc683 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs @@ -506,6 +506,16 @@ impl RelayChainInterface for TestRelayClient { unimplemented!("Not needed for test") } + async fn ancestor_relay_parent_info( + &self, + _at: RelayHash, + _session_index: SessionIndex, + _relay_parent: RelayHash, + ) -> RelayChainResult>> + { + unimplemented!("Not needed for test") + } + async fn node_features(&self, _at: RelayHash) -> RelayChainResult { Ok(NodeFeatures::default()) } @@ -621,6 +631,8 @@ impl RelayChainDataCache { claim_queue: claim_queue_snapshot, max_pov_size: 1024 * 1024, node_features, + max_relay_parent_session_age: 0, + session_index: 0, }; self.insert_test_data(relay_parent_hash, data); diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/unincluded_segment.rs b/cumulus/client/consensus/aura/src/collators/slot_based/unincluded_segment.rs new file mode 100644 index 0000000000000..f716b4ffc598a --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/unincluded_segment.rs @@ -0,0 +1,307 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Reconstruct the parachain's unincluded segment from local node state. +//! +//! Walks the canonical fork (per the parachain backend's fork choice) from the relay-chain +//! reported included block forward to the parachain's best block. The result is the +//! ordered list of parablock headers that have been authored locally but not yet included +//! on the relay chain. +//! +//! For V4 resubmission, each block's relay parent is checked against the ancestry of the +//! `scheduling_parent` we are building on; blocks whose relay parent is not an ancestor +//! are dropped, producing a potentially gapped list. The ancestry walk respects the +//! relay-chain `max_relay_parent_session_age` runtime limit, so older sessions are not +//! considered valid relay parents. + +use super::CollatorMessage; +use crate::collators::slot_based::relay_chain_data_cache::RelayChainDataCache; +use cumulus_primitives_core::{ + relay_chain::Hash as RelayHash, ParaId, PersistedValidationData, SchedulingProof, +}; +use cumulus_relay_chain_interface::RelayChainInterface; +use polkadot_primitives::{ + Block as RelayBlock, CoreIndex, Header as RelayHeader, ValidationCodeHash, +}; +use sc_client_api::Backend; +use sp_api::StorageProof; +use sp_blockchain::HeaderBackend; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::time::Instant; + +const LOG_TARGET: &str = "consensus::slot_based::unincluded_segment"; + +/// Reconstruct the unincluded segment as the canonical chain from the relay-chain reported +/// included block (exclusive) up to the parachain's best block (inclusive). +/// +/// Parablocks whose relay parent is not within the relay ancestry of `scheduling_parent` +/// (bounded by the relay chain's `max_relay_parent_session_age`) are dropped, so the returned +/// chain may contain gaps in block numbers. +/// +/// Returns headers ordered oldest first (just after the included block) to newest (best). +pub async fn reconstruct_unincluded_segment( + scheduling_parent: RelayHeader, + para_id: ParaId, + para_backend: &B, + relay_client: &RClient, + relay_chain_data_cache: &mut RelayChainDataCache, +) -> Vec +where + Block: BlockT, + B: Backend, + RClient: RelayChainInterface + Clone + 'static, +{ + let scheduling_parent_hash = scheduling_parent.hash(); + let Some((included_header, included_hash)) = + cumulus_client_consensus_common::fetch_included_from_relay_chain::( + relay_client, + para_backend, + para_id, + scheduling_parent_hash, + ) + .await + .ok() + .flatten() + else { + return Vec::new(); + }; + let included_rp = rp_from_digest::(&included_header); + + let best_hash = para_backend.blockchain().info().best_hash; + let Some(segment_newest_first) = walk_back(para_backend, best_hash, included_hash) else { + return Vec::new(); + }; + + let rp_ancestry = + build_scheduling_parent_ancestry(scheduling_parent, included_rp, relay_chain_data_cache) + .await; + + let mut filtered: Vec = segment_newest_first + .into_iter() + .filter(|h| rp_in_ancestry::(h, &rp_ancestry)) + .collect(); + filtered.reverse(); + filtered +} + +/// Walk parent pointers from `tip_hash` until `stop_hash` (the included block), collecting +/// headers along the way. Returns headers newest-first (excluding `stop_hash`). +/// +/// Returns `None` if any header on the path is missing (which would mean the local backend is +/// inconsistent with the relay-chain reported included block). +fn walk_back( + para_backend: &B, + tip_hash: Block::Hash, + stop_hash: Block::Hash, +) -> Option> +where + Block: BlockT, + B: Backend, +{ + let mut acc = Vec::new(); + let mut cursor = tip_hash; + while cursor != stop_hash { + let header = match para_backend.blockchain().header(cursor) { + Ok(Some(h)) => h, + _ => { + tracing::warn!( + target: LOG_TARGET, + hash = ?cursor, + "Missing parachain header while walking back to included block.", + ); + return None; + }, + }; + cursor = *header.parent_hash(); + acc.push(header); + } + Some(acc) +} + +/// Collect the relay-chain ancestry of `scheduling_parent` that is admissible as a parablock +/// relay parent. The walk stops at the earliest of: +/// * the relay parent of the included head (inclusive) — anything older cannot be the relay +/// parent of an unincluded block in practice; +/// * `max_relay_parent_session_age` sessions back, matching the relay-chain runtime limit; +/// * the relay-chain genesis. +/// +/// Session boundaries are detected via BABE epoch-change digests on the walked relay headers, +/// matching the rule applied in `offset_relay_parent_find_descendants`. Returns the admissible +/// ancestor hashes newest-first, including `scheduling_parent` itself. +async fn build_scheduling_parent_ancestry( + scheduling_parent: RelayHeader, + included_rp: Option, + relay_chain_data_cache: &mut RelayChainDataCache, +) -> Vec +where + RClient: RelayChainInterface + Clone + 'static, +{ + let max_relay_parent_session_age = relay_chain_data_cache + .get_by_hash(scheduling_parent.hash()) + .await + .map(|d| d.max_relay_parent_session_age) + .unwrap_or(0); + let mut ancestry = Vec::new(); + let mut current = scheduling_parent; + let mut session_age: u32 = 0; + loop { + // Session boundary: the current header carries an epoch-change digest, so the *parent* + // belongs to an older session. We've already accepted `current` itself before bumping. + if sc_consensus_babe::contains_epoch_change::(¤t) { + session_age = session_age.saturating_add(1); + } + if session_age > max_relay_parent_session_age { + break; + } + + let current_hash = current.hash(); + ancestry.push(current_hash); + + // Stop once we've recorded the relay parent of the included head: nothing older can be + // the relay parent of an unincluded block. + if Some(current_hash) == included_rp { + break; + } + + if current.number == 0 { + break; + } + + let parent_hash = *current.parent_hash(); + match relay_chain_data_cache.get_by_hash(parent_hash).await { + Ok(data) => current = data.relay_header.clone(), + Err(()) => break, + } + } + ancestry +} + +/// Extract the relay parent hash that a parablock built against, from its digest. +fn rp_from_digest(header: &Block::Header) -> Option { + cumulus_primitives_core::extract_relay_parent(header.digest()) +} + +/// Whether the parablock's relay parent is within the precomputed relay ancestry. +fn rp_in_ancestry(header: &Block::Header, ancestry: &[RelayHash]) -> bool { + match rp_from_digest::(header) { + Some(rp) => ancestry.iter().any(|h| *h == rp), + None => false, + } +} + +// --------------------------------------------------------------------------- +// Segment shapes used by the collation task to assemble multi-entry submissions. +// +// The on-wire channel payload from the builder is `super::CollatorMessage` — always one bundle. +// Everything multi-entry lives below and never crosses the channel. +// --------------------------------------------------------------------------- + +/// One built bundle's payload, stripped of core/proof metadata. +pub struct SegmentEntry { + pub relay_parent: RelayHash, + pub parent_header: Block::Header, + pub blocks: Vec, + pub proof: StorageProof, + pub validation_code_hash: ValidationCodeHash, + pub validation_data: PersistedValidationData, +} + +impl From> for SegmentEntry { + fn from(msg: CollatorMessage) -> Self { + Self { + relay_parent: msg.relay_parent, + parent_header: msg.parent_header, + blocks: msg.blocks, + proof: msg.proof, + validation_code_hash: msg.validation_code_hash, + validation_data: msg.validation_data, + } + } +} + +/// Walked + loaded prior unincluded segment, *unsigned*. Lives between `HydrateSegment` and +/// `FinalizeSegment`. Non-empty by construction. +pub struct HydratedSegment { + pub entries: Vec>, + /// Newest unincluded block; anchor for the authority/slot-duration lookup at signing time. + pub authority_anchor: Block::Hash, +} + +/// Signed prior segment held by the collation task until either the matching first-core +/// `CollatorMessage` arrives (merge then submit) or `deadline` elapses (submit alone). +pub struct PriorSegment { + pub scheduling_proof: SchedulingProof, + pub core_index: CoreIndex, + pub deadline: Instant, + pub entries: Vec>, +} + +impl PriorSegment { + /// First-core blocks arrived: append, then submit under the prior's signed proof. + pub fn merge_first_core(mut self, msg: CollatorMessage) -> Segment { + debug_assert_eq!(self.core_index, msg.core_index); + self.entries.push(msg.into()); + Segment { + scheduling_proof: Some(self.scheduling_proof), + core_index: self.core_index, + entries: self.entries, + } + } + + /// Deadline path: the first core built nothing this slot. + pub fn into_segment(self) -> Segment { + Segment { + scheduling_proof: Some(self.scheduling_proof), + core_index: self.core_index, + entries: self.entries, + } + } +} + +/// Submit-ready segment for one core. +pub struct Segment { + pub scheduling_proof: Option, + pub core_index: CoreIndex, + pub entries: Vec>, +} + +impl Segment { + pub fn singleton(msg: CollatorMessage) -> Self { + let CollatorMessage { + scheduling_proof, + core_index, + relay_parent, + parent_header, + blocks, + proof, + validation_code_hash, + validation_data, + } = msg; + Self { + scheduling_proof, + core_index, + entries: vec![SegmentEntry { + relay_parent, + parent_header, + blocks, + proof, + validation_code_hash, + validation_data, + }], + } + } +} diff --git a/cumulus/client/consensus/common/Cargo.toml b/cumulus/client/consensus/common/Cargo.toml index c65e05c02266a..558dcf1699626 100644 --- a/cumulus/client/consensus/common/Cargo.toml +++ b/cumulus/client/consensus/common/Cargo.toml @@ -54,3 +54,4 @@ sp-tracing = { workspace = true, default-features = true } # Cumulus cumulus-test-client = { workspace = true } cumulus-test-relay-sproof-builder = { workspace = true, default-features = true } +substrate-test-runtime = { workspace = true } diff --git a/cumulus/client/consensus/common/src/finality.rs b/cumulus/client/consensus/common/src/finality.rs new file mode 100644 index 0000000000000..92c16de2a35b3 --- /dev/null +++ b/cumulus/client/consensus/common/src/finality.rs @@ -0,0 +1,162 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Shared helpers for finality-driven aux-storage cleanup. + +use sc_client_api::HeaderBackend; +use sp_runtime::traits::{Block as BlockT, Header as _}; + +const LOG_TARGET: &str = "consensus::common::finality"; + +/// Compute the previously-finalized block hash from a tree route and a fallback parent hash. +/// +/// This is the parent of the first block in the tree route, or the supplied `fallback_parent` +/// (typically the parent hash of the just-finalized header) when the tree route is empty or its +/// first block's header can't be loaded. +/// +/// Taking the inputs directly rather than a `FinalityNotification` keeps this function unit- +/// testable from outside `sc-client-api` (which has a private `unpin_handle` field). +pub fn old_finalized_hash( + client: &C, + tree_route: &[Block::Hash], + fallback_parent: Block::Hash, +) -> Block::Hash +where + C: HeaderBackend, + Block: BlockT, +{ + let Some(first) = tree_route.first() else { + return fallback_parent; + }; + + match client.header(*first) { + Ok(Some(header)) => *header.parent_hash(), + Ok(None) => { + tracing::warn!( + target: LOG_TARGET, + ?first, + "tree_route head header missing; falling back to notification parent hash", + ); + fallback_parent + }, + Err(error) => { + tracing::warn!( + target: LOG_TARGET, + ?first, + ?error, + "tree_route head header lookup failed; falling back to notification parent hash", + ); + fallback_parent + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_blockchain::Result as ClientResult; + + type Block = substrate_test_runtime::Block; + type Hash = ::Hash; + + // Minimal `HeaderBackend` mock for the `old_finalized_hash_*` tests. + // + // `lookup` is `None` ⇒ `header()` returns `Ok(None)` (simulates a missing header). + struct MockHeaderBackend { + lookup: Option<(Hash, substrate_test_runtime::Header)>, + } + + impl HeaderBackend for MockHeaderBackend { + fn header( + &self, + hash: ::Hash, + ) -> ClientResult::Header>> { + Ok(self.lookup.as_ref().and_then(|(h, hdr)| (*h == hash).then(|| hdr.clone()))) + } + + fn info(&self) -> sc_client_api::blockchain::Info { + unimplemented!() + } + + fn status( + &self, + _hash: ::Hash, + ) -> ClientResult { + unimplemented!() + } + + fn number( + &self, + _hash: ::Hash, + ) -> ClientResult>> { + unimplemented!() + } + + fn hash( + &self, + _number: sp_runtime::traits::NumberFor, + ) -> ClientResult::Hash>> { + unimplemented!() + } + } + + #[test] + fn old_finalized_hash_with_empty_tree_route() { + let client = MockHeaderBackend { lookup: None }; + let fallback = Hash::repeat_byte(0x01); + + let old_hash = old_finalized_hash::<_, Block>(&client, &[], fallback); + assert_eq!( + old_hash, fallback, + "empty tree_route should fall through to the supplied parent" + ); + } + + #[test] + fn old_finalized_hash_with_tree_route() { + use substrate_test_runtime::Header as TestHeader; + + let expected_old = Hash::repeat_byte(0x01); + let tree_block = Hash::repeat_byte(0x02); + + let header = TestHeader { + parent_hash: expected_old, + number: 2, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + let client = MockHeaderBackend { lookup: Some((tree_block, header)) }; + let fallback = Hash::repeat_byte(0xFF); + + let old_hash = old_finalized_hash::<_, Block>(&client, &[tree_block], fallback); + assert_eq!(old_hash, expected_old, "should resolve to parent of first tree_route block"); + } + + #[test] + fn old_finalized_hash_falls_back_when_header_missing() { + // Non-empty tree_route, but the header lookup returns None — the function should fall back + // to `fallback_parent` rather than panicking or returning a wrong hash. + let client = MockHeaderBackend { lookup: None }; + let tree_block = Hash::repeat_byte(0x02); + let fallback = Hash::repeat_byte(0xAA); + + let old_hash = old_finalized_hash::<_, Block>(&client, &[tree_block], fallback); + assert_eq!(old_hash, fallback, "missing header should fall back to the supplied parent"); + } +} diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index d7d135f98a842..c4aa295a714f4 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -30,12 +30,14 @@ use sp_timestamp::Timestamp; use std::{sync::Arc, time::Duration}; +mod finality; mod level_monitor; mod parachain_consensus; mod parent_search; #[cfg(test)] mod tests; +pub use finality::old_finalized_hash; pub use parent_search::*; pub use cumulus_relay_chain_streams::finalized_heads; diff --git a/cumulus/client/consensus/common/src/parent_search.rs b/cumulus/client/consensus/common/src/parent_search.rs index 3d7729f3b98a0..0676a03736923 100644 --- a/cumulus/client/consensus/common/src/parent_search.rs +++ b/cumulus/client/consensus/common/src/parent_search.rs @@ -79,30 +79,59 @@ pub async fn find_parent_for_building( ) -> Result>, RelayChainError> { tracing::trace!(target: LOG_TARGET, "Parent search parameters: {params:?}"); - // Get the included block. - let Some((included_header, included_hash)) = - fetch_included_from_relay_chain(relay_client, backend, params.para_id, params.relay_parent) + let Some(ParentSearchStart { included_header, start_hash, start_header }) = + pick_parent_search_start(relay_client, backend, params.para_id, params.relay_parent) .await? else { return Ok(None); }; - // Fetch the pending block if one exists. + // Build up the ancestry record of the relay chain to compare against. + let rp_ancestry = + build_relay_parent_ancestry(params.ancestry_lookback, params.relay_parent, relay_client) + .await?; + + // Search for the deepest valid parent starting from the pending/included block. + let best_parent_header = + find_deepest_valid_parent(start_hash, start_header, backend, &rp_ancestry); + + Ok(Some(ParentSearchResult { included_header, best_parent_header })) +} + +/// Anchor for a parent search: the truly-included head plus the chosen `start` block (pending +/// availability if reported and locally known and a descendant of included, otherwise included). +pub struct ParentSearchStart { + pub included_header: B::Header, + pub start_hash: B::Hash, + pub start_header: B::Header, +} + +/// Pick the start block for a parent search at `relay_parent`. Prefers pending availability over +/// the truly-included head. Returns `Ok(None)` when the included head is unknown locally, when a +/// reported pending head is unusable (not locally known, undecodable, or not a descendant of +/// included), or when any required relay-chain call fails to produce a head. +pub async fn pick_parent_search_start( + relay_client: &impl RelayChainInterface, + backend: &impl Backend, + para_id: ParaId, + relay_parent: RelayHash, +) -> Result>, RelayChainError> { + let Some((included_header, included_hash)) = + fetch_included_from_relay_chain(relay_client, backend, para_id, relay_parent).await? + else { + return Ok(None); + }; + + // Fetch the pending block if one exists. `OccupiedCoreAssumption::Included` enacts the + // candidate pending availability before returning, so the head we get is what the next + // block should be built on top of. let maybe_pending = { - // Fetch the most recent pending header from the relay chain. We use - // `OccupiedCoreAssumption::Included` so the candidate pending availability gets enacted - // before being returned to us. let pending_header = relay_client - .persisted_validation_data( - params.relay_parent, - params.para_id, - OccupiedCoreAssumption::Included, - ) + .persisted_validation_data(relay_parent, para_id, OccupiedCoreAssumption::Included) .await? .and_then(|p| B::Header::decode(&mut &p.parent_head.0[..]).ok()) .filter(|x| x.hash() != included_hash); - // If the pending block is not locally known, we can't proceed. if let Some(header) = pending_header { let pending_hash = header.hash(); let Ok(Some(header)) = backend.blockchain().header(pending_hash) else { @@ -119,10 +148,8 @@ pub async fn find_parent_for_building( } }; - // Determine the starting point for the search. let (start_hash, start_header) = match &maybe_pending { Some((pending_header, pending_hash)) => { - // Verify pending is a descendant of included. let route = sp_blockchain::tree_route(backend.blockchain(), included_hash, *pending_hash)?; if !route.retracted().is_empty() { @@ -137,20 +164,11 @@ pub async fn find_parent_for_building( None => (included_hash, included_header.clone()), }; - // Build up the ancestry record of the relay chain to compare against. - let rp_ancestry = - build_relay_parent_ancestry(params.ancestry_lookback, params.relay_parent, relay_client) - .await?; - - // Search for the deepest valid parent starting from the pending/included block. - let best_parent_header = - find_deepest_valid_parent(start_hash, start_header, backend, &rp_ancestry); - - Ok(Some(ParentSearchResult { included_header, best_parent_header })) + Ok(Some(ParentSearchStart { included_header, start_hash, start_header })) } /// Fetch the included block from the relay chain. -async fn fetch_included_from_relay_chain( +pub async fn fetch_included_from_relay_chain( relay_client: &impl RelayChainInterface, backend: &impl Backend, para_id: ParaId, diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index 0f7e273e456e2..c4eb2dfaf9f0c 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -309,6 +309,16 @@ impl RelayChainInterface for Relaychain { unimplemented!("Not needed for test") } + async fn ancestor_relay_parent_info( + &self, + _at: PHash, + _session_index: SessionIndex, + _relay_parent: PHash, + ) -> RelayChainResult>> + { + unimplemented!("Not needed for test") + } + async fn node_features(&self, _at: Hash) -> RelayChainResult { unimplemented!("Not needed for test") } diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index f673c32b7682f..755cd5a0bbec3 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -368,6 +368,16 @@ impl RelayChainInterface for DummyRelayChainInterface { unimplemented!("Not needed for test") } + async fn ancestor_relay_parent_info( + &self, + _at: PHash, + _session_index: SessionIndex, + _relay_parent: PHash, + ) -> RelayChainResult>> + { + unimplemented!("Not needed for test") + } + async fn node_features(&self, _at: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs index d05e3e3442168..e28c0423e396e 100644 --- a/cumulus/client/pov-recovery/src/tests.rs +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -22,8 +22,9 @@ use cumulus_primitives_core::relay_chain::{ BlockId, CandidateCommitments, CandidateDescriptorV2, CoreIndex, CoreState, }; use cumulus_relay_chain_interface::{ - ChildInfo, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader, - PersistedValidationData, RelayChainResult, StorageValue, ValidationCodeHash, ValidatorId, + BlockNumber, ChildInfo, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, + PHash, PHeader, PersistedValidationData, RelayChainResult, SessionIndex, StorageValue, + ValidationCodeHash, ValidatorId, }; use cumulus_test_client::runtime::{Block, Header}; use futures::{channel::mpsc, SinkExt, Stream}; @@ -533,6 +534,16 @@ impl RelayChainInterface for Relaychain { unimplemented!("Not needed for test"); } + async fn ancestor_relay_parent_info( + &self, + _at: PHash, + _session_index: SessionIndex, + _relay_parent: PHash, + ) -> RelayChainResult>> + { + unimplemented!("Not needed for test"); + } + async fn node_features(&self, _at: PHash) -> RelayChainResult { unimplemented!("Not needed for test"); } diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 552a98e30d0c9..ff6fad4969a57 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -34,7 +34,7 @@ use cumulus_primitives_core::{ InboundDownwardMessage, ParaId, PersistedValidationData, }; use cumulus_relay_chain_interface::{ - ChildInfo, RelayChainError, RelayChainInterface, RelayChainResult, + ChildInfo, RelayChainError, RelayChainInterface, RelayChainResult, RelayParentInfo, }; use futures::{FutureExt, Stream, StreamExt}; use polkadot_primitives::{CandidateEvent, NodeFeatures}; @@ -353,6 +353,18 @@ impl RelayChainInterface for RelayChainInProcessInterface { Ok(self.full_client.runtime_api().max_relay_parent_session_age(at)?) } + async fn ancestor_relay_parent_info( + &self, + at: PHash, + session_index: SessionIndex, + relay_parent: PHash, + ) -> RelayChainResult>> { + Ok(self + .full_client + .runtime_api() + .ancestor_relay_parent_info(at, session_index, relay_parent)?) + } + async fn node_features(&self, at: PHash) -> RelayChainResult { Ok(self.full_client.runtime_api().node_features(at)?) } diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index d76a43428c7ec..9581356c02efd 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -34,6 +34,7 @@ use sp_api::ApiError; use cumulus_primitives_core::relay_chain::{ BlockId, CandidateEvent, Hash as RelayHash, NodeFeatures, }; +pub use cumulus_primitives_core::relay_chain::vstaging::RelayParentInfo; pub use cumulus_primitives_core::{ relay_chain::{ BlockNumber, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, @@ -264,6 +265,17 @@ pub trait RelayChainInterface: Send + Sync { async fn max_relay_parent_session_age(&self, at: RelayHash) -> RelayChainResult; + /// Look up info for `relay_parent` (an ancestor of `at`) in session `session_index`. + /// Returns `None` if `relay_parent` is not in `at`'s allowed relay parents for that + /// session. A block is never in its own allowed list, so calling this with + /// `relay_parent == at` always yields `None`. + async fn ancestor_relay_parent_info( + &self, + at: RelayHash, + session_index: SessionIndex, + relay_parent: RelayHash, + ) -> RelayChainResult>>; + async fn node_features(&self, at: RelayHash) -> RelayChainResult; } @@ -441,6 +453,15 @@ where (**self).max_relay_parent_session_age(at).await } + async fn ancestor_relay_parent_info( + &self, + at: RelayHash, + session_index: SessionIndex, + relay_parent: RelayHash, + ) -> RelayChainResult>> { + (**self).ancestor_relay_parent_info(at, session_index, relay_parent).await + } + async fn node_features(&self, at: RelayHash) -> RelayChainResult { (**self).node_features(at).await } diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index 12e5478dd850f..208d42ae54147 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -27,7 +27,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_interface::{ BlockNumber, ChildInfo, CoreIndex, CoreState, PHeader, RelayChainError, RelayChainInterface, - RelayChainResult, + RelayChainResult, RelayParentInfo, }; use futures::{FutureExt, Stream, StreamExt}; use polkadot_overseer::Handle; @@ -311,6 +311,17 @@ impl RelayChainInterface for RelayChainRpcInterface { self.rpc_client.parachain_host_max_relay_parent_session_age(at).await } + async fn ancestor_relay_parent_info( + &self, + at: RelayHash, + session_index: SessionIndex, + relay_parent: RelayHash, + ) -> RelayChainResult>> { + self.rpc_client + .parachain_host_ancestor_relay_parent_info(at, session_index, relay_parent) + .await + } + async fn node_features(&self, at: RelayHash) -> RelayChainResult { self.rpc_client.parachain_host_node_features(at).await } diff --git a/cumulus/client/unincluded-segment-store/Cargo.toml b/cumulus/client/unincluded-segment-store/Cargo.toml new file mode 100644 index 0000000000000..70636b33421d2 --- /dev/null +++ b/cumulus/client/unincluded-segment-store/Cargo.toml @@ -0,0 +1,26 @@ +[package] +authors.workspace = true +name = "cumulus-client-unincluded-segment-store" +version = "0.1.0" +edition.workspace = true +description = "Per-block proof store for unincluded segment resubmission." +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +codec = { workspace = true, default-features = true } +cumulus-client-consensus-common = { workspace = true, default-features = true } +sc-client-api = { workspace = true, default-features = true } +sp-blockchain = { workspace = true, default-features = true } +sp-runtime = { workspace = true, default-features = true } +sp-trie = { workspace = true, default-features = true } + +[dev-dependencies] +hex = { workspace = true, default-features = true } +sc-client-db = { workspace = true, default-features = true } +substrate-test-runtime = { workspace = true } +tempfile = { workspace = true } diff --git a/cumulus/client/unincluded-segment-store/src/lib.rs b/cumulus/client/unincluded-segment-store/src/lib.rs new file mode 100644 index 0000000000000..873416134cf31 --- /dev/null +++ b/cumulus/client/unincluded-segment-store/src/lib.rs @@ -0,0 +1,440 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Per-block proof store for unincluded segment resubmission. +//! +//! Persists `(time, StorageProof)` per imported parablock, keyed by parablock hash. Data is +//! pruned on parachain finality. + +use codec::{Decode, Encode}; +use cumulus_client_consensus_common::old_finalized_hash; +use sc_client_api::{ + backend::AuxStore, + client::{AuxDataOperations, FinalityNotification, PreCommitActions}, + HeaderBackend, +}; +use sp_blockchain::{Error as ClientError, Result as ClientResult}; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use sp_trie::StorageProof; +use std::{ + marker::PhantomData, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +const STORE_VERSION_KEY: &[u8] = b"cumulus_unincluded_segment_store_version"; +const STORE_CURRENT_VERSION: u32 = 1; +const STORE_ENTRY_PREFIX: &[u8] = b"cumulus_unincluded_segment_store"; + +/// Return the current Unix milliseconds timestamp. +pub fn now_unix_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock is before UNIX epoch; qed") + .as_millis() as u64 +} + +/// Entry stored in aux storage for each unincluded parablock. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct StoredEntry { + /// Unix millis at the moment block_import started. + pub time_ms: u64, + /// The storage proof captured at block_import. + pub proof: StorageProof, +} + +fn entry_key(block_hash: H) -> Vec { + (STORE_ENTRY_PREFIX, block_hash).encode() +} + +/// Per-block proof store backed by `AuxStore`. +pub struct UnincludedSegmentStore { + backend: Arc, + _marker: PhantomData Block>, +} + +impl Clone for UnincludedSegmentStore { + fn clone(&self) -> Self { + Self { backend: self.backend.clone(), _marker: PhantomData } + } +} + +impl UnincludedSegmentStore { + /// Create a new store over `backend`. + pub fn new(backend: Arc) -> Self { + Self { backend, _marker: PhantomData } + } +} + +/// Build the aux-data key/value pairs to commit alongside a block. +/// +/// The caller should push these into `BlockImportParams::auxiliary` so they commit in the +/// same DB transaction as the block. Stateless — no backend access required. +pub fn prepare_aux_data( + block_hash: Block::Hash, + time_ms: u64, + proof: &StorageProof, +) -> impl Iterator, Vec)> { + let encoded_entry = (time_ms, proof).encode(); + let encoded_version = STORE_CURRENT_VERSION.encode(); + + [(entry_key(block_hash), encoded_entry), (STORE_VERSION_KEY.to_vec(), encoded_version)] + .into_iter() +} + +impl UnincludedSegmentStore { + /// Load the entry stored for `block_hash`, if any. + pub fn load(&self, block_hash: Block::Hash) -> ClientResult> { + let version = self.decode_aux::(STORE_VERSION_KEY)?; + + match version { + None => Ok(None), + Some(STORE_CURRENT_VERSION) => self.decode_aux(entry_key(block_hash).as_slice()), + Some(other) => Err(ClientError::Backend(format!( + "Unsupported unincluded segment store DB version: {:?}", + other + ))), + } + } + + fn decode_aux(&self, key: &[u8]) -> ClientResult> { + match self.backend.get_aux(key)? { + None => Ok(None), + Some(t) => T::decode(&mut &t[..]).map(Some).map_err(|e| { + ClientError::Backend(format!( + "Unincluded segment store DB is corrupted. Decode error: {}", + e + )) + }), + } + } +} + +impl UnincludedSegmentStore +where + Block: BlockT, + B: PreCommitActions + HeaderBackend + 'static, +{ + /// Register a finality hook that prunes entries for the just-finalized chain, the + /// intermediate tree route, the prior finalized head, and any stale forks. + /// + /// TODO(#12034): also prune entries whose relay-parent session is older than + /// `max_relay_parent_session_age` relative to the current session. Session info will be + /// sourced from a separate session-data cache (see #11624), not from per-entry storage. + pub fn register_cleanup(&self) { + let client = self.backend.clone(); + let on_finality = move |notification: &FinalityNotification| -> AuxDataOperations { + let old_finalized = old_finalized_hash::<_, Block>( + &*client, + ¬ification.tree_route, + *notification.header.parent_hash(), + ); + + finality_cleanup_ops::( + notification.hash, + old_finalized, + ¬ification.tree_route, + notification.stale_blocks.iter().map(|b| b.hash), + ) + }; + + self.backend.register_finality_action(Box::new(on_finality)); + } +} + +/// Compute aux storage cleanup operations. +/// +/// Emits deletes for stale-fork blocks, intermediate tree-route blocks, the pre-finality head, +/// and the just-finalized block itself. Once a block is finalized it is no longer in any +/// unincluded segment, so its proof entry is dead weight. +fn finality_cleanup_ops( + just_finalized_hash: Block::Hash, + old_finalized_hash: Block::Hash, + tree_route: &[Block::Hash], + stale_block_hashes: impl IntoIterator, +) -> AuxDataOperations { + let stale_iter = stale_block_hashes.into_iter(); + + let mut ops = Vec::with_capacity(stale_iter.size_hint().0 + tree_route.len() + 2); + ops.extend(stale_iter.map(|hash| (entry_key(hash), None))); + ops.extend(tree_route.iter().map(|hash| (entry_key(hash), None))); + ops.push((entry_key(old_finalized_hash), None)); + ops.push((entry_key(just_finalized_hash), None)); + + ops +} + +#[cfg(test)] +mod tests { + use super::*; + use sc_client_api::backend::AuxStore; + + type Block = substrate_test_runtime::Block; + type Hash = ::Hash; + type TestBackend = sc_client_api::in_mem::Backend; + type Store = UnincludedSegmentStore; + + fn create_test_entry(time_ms: u64) -> StoredEntry { + StoredEntry { time_ms, proof: StorageProof::new(vec![vec![1, 2, 3], vec![4, 5, 6]]) } + } + + fn new_store() -> (Arc, Store) { + let backend = Arc::new(TestBackend::new()); + let store = Store::new(backend.clone()); + (backend, store) + } + + fn write_via_store(backend: &Arc, hash: Hash, entry: &StoredEntry) { + let pairs: Vec<_> = prepare_aux_data::(hash, entry.time_ms, &entry.proof).collect(); + let insert_pairs: Vec<_> = + pairs.iter().map(|(k, v)| (k.as_slice(), v.as_slice())).collect(); + AuxStore::insert_aux(&**backend, &insert_pairs, &[]).expect("aux insert should succeed"); + } + + #[test] + fn prepare_produces_expected_key_value_pairs() { + let hash = Hash::repeat_byte(0xAB); + let time_ms = 1234567890u64; + let proof = StorageProof::new(vec![vec![10, 20, 30]]); + + let pairs: Vec<_> = prepare_aux_data::(hash, time_ms, &proof).collect(); + + assert_eq!(pairs.len(), 2); + + let expected_key = (STORE_ENTRY_PREFIX, hash).encode(); + assert_eq!(pairs[0].0, expected_key); + + let decoded_entry = + StoredEntry::decode(&mut pairs[0].1.as_slice()).expect("entry should decode"); + assert_eq!(decoded_entry.time_ms, time_ms); + assert_eq!(decoded_entry.proof, proof); + + assert_eq!(pairs[1].0, STORE_VERSION_KEY.to_vec()); + let decoded_version = + u32::decode(&mut pairs[1].1.as_slice()).expect("version should decode"); + assert_eq!(decoded_version, STORE_CURRENT_VERSION); + } + + #[test] + fn load_returns_none_when_no_entry_exists() { + let (_backend, store) = new_store(); + let hash = Hash::repeat_byte(0xEF); + + assert_eq!(store.load(hash).expect("load should succeed"), None); + } + + #[test] + fn cleanup_combines_all_categories() { + let stale_1 = Hash::repeat_byte(0xAA); + let stale_2 = Hash::repeat_byte(0xBB); + let route_1 = Hash::repeat_byte(0xC1); + let route_2 = Hash::repeat_byte(0xC2); + let old_finalized = Hash::repeat_byte(0xF0); + let just_finalized = Hash::repeat_byte(0xFF); + + let ops = finality_cleanup_ops::( + just_finalized, + old_finalized, + &[route_1, route_2], + [stale_1, stale_2], + ); + + let keys: Vec<_> = ops.iter().map(|(k, _)| k.clone()).collect(); + + assert!(keys.contains(&entry_key(stale_1))); + assert!(keys.contains(&entry_key(stale_2))); + assert!(keys.contains(&entry_key(route_1))); + assert!(keys.contains(&entry_key(route_2))); + assert!(keys.contains(&entry_key(old_finalized))); + assert!(keys.contains(&entry_key(just_finalized))); + + assert!(ops.iter().all(|(_, v)| v.is_none())); + } + + #[test] + fn cleanup_handles_empty_inputs() { + let just_finalized = Hash::repeat_byte(0xFF); + let old_finalized = Hash::repeat_byte(0xF0); + + let ops = finality_cleanup_ops::( + just_finalized, + old_finalized, + &[], + std::iter::empty::(), + ); + + assert_eq!(ops.len(), 2); + assert!(ops.iter().all(|(_, v)| v.is_none())); + } + + #[test] + fn stored_entry_encoding_hex_snapshot() { + let entry = + StoredEntry { time_ms: 1234567890u64, proof: StorageProof::new(vec![vec![1, 2, 3]]) }; + + let encoded = entry.encode(); + // Snapshot of the SCALE encoding. If this assertion fires, the on-disk format of + // `StoredEntry` has changed and existing aux entries written by older builds will fail + // to decode — bump `STORE_CURRENT_VERSION` and add a migration before updating this + // snapshot. + let encoded_hex = hex::encode(&encoded); + // time_ms = 1234567890 little-endian u64 = d2 02 96 49 00 00 00 00 + // proof = Vec> with one element [1,2,3]: outer len 1 (SCALE compact = 04), + // inner len 3 (compact = 0c), bytes 01 02 03 + let expected_hex = "d20296490000000004 0c 010203".replace(' ', ""); + assert_eq!(encoded_hex, expected_hex, "StoredEntry encoding changed!"); + + let decoded = StoredEntry::decode(&mut encoded.as_slice()).expect("decode should succeed"); + assert_eq!(entry, decoded); + } + + #[test] + fn decode_corrupted_entry_body() { + let (backend, store) = new_store(); + let hash = Hash::repeat_byte(0xAB); + + // Write correct version. + let version_encoded = STORE_CURRENT_VERSION.encode(); + AuxStore::insert_aux(&*backend, &[(STORE_VERSION_KEY, version_encoded.as_slice())], &[]) + .expect("aux insert should succeed"); + + // Write bogus entry body. + let key = entry_key(hash); + let bogus_data = vec![0xFF, 0xAA, 0xBB]; + AuxStore::insert_aux(&*backend, &[(&key[..], bogus_data.as_slice())], &[]) + .expect("aux insert should succeed"); + + let result = store.load(hash); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("DB is corrupted") && err_msg.contains("Decode error"), + "unexpected error: {}", + err_msg + ); + } + + #[test] + fn end_to_end_write_cleanup_load() { + let (backend, store) = new_store(); + + let hash1 = Hash::repeat_byte(0x01); + let hash2 = Hash::repeat_byte(0x02); + let hash3 = Hash::repeat_byte(0x03); + + let entry1 = create_test_entry(1000); + let entry2 = create_test_entry(2000); + let entry3 = create_test_entry(3000); + + write_via_store(&backend, hash1, &entry1); + write_via_store(&backend, hash2, &entry2); + write_via_store(&backend, hash3, &entry3); + + assert_eq!(store.load(hash1).expect("load"), Some(entry1)); + assert_eq!(store.load(hash2).expect("load"), Some(entry2)); + assert_eq!(store.load(hash3).expect("load"), Some(entry3.clone())); + + // Generate cleanup that deletes hash1 (just-finalized) and hash2 (in tree route). + let ops = finality_cleanup_ops::( + hash1, + Hash::repeat_byte(0xF0), + &[hash2], + std::iter::empty::(), + ); + let delete_keys: Vec<_> = + ops.iter().filter_map(|(k, v)| v.is_none().then(|| k.as_slice())).collect(); + + AuxStore::insert_aux(&*backend, &[], &delete_keys).expect("delete should succeed"); + + assert_eq!(store.load(hash1).expect("load"), None, "hash1 should be deleted"); + assert_eq!(store.load(hash2).expect("load"), None, "hash2 should be deleted"); + assert_eq!(store.load(hash3).expect("load"), Some(entry3), "hash3 should survive"); + } + + #[test] + fn entries_survive_disk_restart() { + use sc_client_db::{ + Backend as DbBackend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode, + }; + + fn with_backend( + path: &std::path::Path, + f: impl FnOnce(&Arc>) -> R, + ) -> R { + let backend = Arc::new( + DbBackend::::new( + DatabaseSettings { + trie_cache_maximum_size: Some(16 * 1024 * 1024), + state_pruning: Some(PruningMode::ArchiveAll), + blocks_pruning: BlocksPruning::KeepAll, + pruning_filters: Default::default(), + source: DatabaseSource::ParityDb { path: path.to_path_buf() }, + metrics_registry: None, + }, + 0, + ) + .expect("open backend"), + ); + let result = f(&backend); + // `backend` (and any clones held by the closure) drop here, closing parity-db. + result + } + + let tmp = tempfile::tempdir().expect("tempdir"); + let path = tmp.path(); + + let hash_a = Hash::repeat_byte(0x10); + let hash_b = Hash::repeat_byte(0x20); + let entry_a = create_test_entry(10_000); + let entry_b = create_test_entry(20_000); + + // Write `a` and `b` via the same path block-import uses, then close. + with_backend(path, |backend| { + let pairs: Vec<_> = prepare_aux_data::(hash_a, entry_a.time_ms, &entry_a.proof) + .chain(prepare_aux_data::(hash_b, entry_b.time_ms, &entry_b.proof)) + .collect(); + let refs: Vec<_> = pairs.iter().map(|(k, v)| (k.as_slice(), v.as_slice())).collect(); + AuxStore::insert_aux(&**backend, &refs, &[]).expect("aux insert"); + }); + + // Restart: confirm both entries survived, then apply a finality-style delete of `a`. + with_backend(path, |backend| { + let store = UnincludedSegmentStore::::new(backend.clone()); + assert_eq!(store.load(hash_a).expect("load a"), Some(entry_a.clone())); + assert_eq!(store.load(hash_b).expect("load b"), Some(entry_b.clone())); + + let ops = finality_cleanup_ops::( + hash_a, + Hash::repeat_byte(0xF0), + &[], + std::iter::empty::(), + ); + let delete_keys: Vec<_> = + ops.iter().filter_map(|(k, v)| v.is_none().then(|| k.as_slice())).collect(); + AuxStore::insert_aux(&**backend, &[], &delete_keys).expect("delete"); + }); + + // Restart: the delete must have persisted; `b` must still be there. + with_backend(path, |backend| { + let store = UnincludedSegmentStore::::new(backend.clone()); + assert_eq!(store.load(hash_a).expect("load a"), None, "hash_a delete must persist"); + assert_eq!(store.load(hash_b).expect("load b"), Some(entry_b)); + }); + + // `tmp` drops here, recursively removing the parity-db directory. + } +} diff --git a/cumulus/pallets/aura-ext/Cargo.toml b/cumulus/pallets/aura-ext/Cargo.toml index ad633865e0fb4..1721446d4db82 100644 --- a/cumulus/pallets/aura-ext/Cargo.toml +++ b/cumulus/pallets/aura-ext/Cargo.toml @@ -22,17 +22,18 @@ pallet-aura = { workspace = true } pallet-timestamp = { workspace = true } sp-application-crypto = { workspace = true } sp-consensus-aura = { workspace = true } +sp-consensus-babe = { workspace = true } sp-runtime = { workspace = true } # Cumulus cumulus-pallet-parachain-system = { workspace = true } +cumulus-primitives-core = { workspace = true } [dev-dependencies] rstest = { workspace = true } # Cumulus cumulus-pallet-parachain-system = { workspace = true, default-features = true } -cumulus-primitives-core = { workspace = true, default-features = true } cumulus-primitives-proof-size-hostfunction = { workspace = true, default-features = true } cumulus-test-relay-sproof-builder = { workspace = true, default-features = true } @@ -49,6 +50,7 @@ default = ["std"] std = [ "codec/std", "cumulus-pallet-parachain-system/std", + "cumulus-primitives-core/std", "frame-support/std", "frame-system/std", "pallet-aura/std", @@ -56,6 +58,7 @@ std = [ "scale-info/std", "sp-application-crypto/std", "sp-consensus-aura/std", + "sp-consensus-babe/std", "sp-runtime/std", ] try-runtime = [ diff --git a/cumulus/pallets/aura-ext/src/lib.rs b/cumulus/pallets/aura-ext/src/lib.rs index 5fcc09b8ef420..47ed329a29978 100644 --- a/cumulus/pallets/aura-ext/src/lib.rs +++ b/cumulus/pallets/aura-ext/src/lib.rs @@ -40,10 +40,12 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, LazyBlock}; pub mod consensus_hook; pub mod migration; +pub mod signature_verifier; #[cfg(test)] mod test; pub use consensus_hook::FixedVelocityConsensusHook; +pub use signature_verifier::AuraSchedulingVerifier; type Aura = pallet_aura::Pallet; diff --git a/cumulus/pallets/aura-ext/src/signature_verifier.rs b/cumulus/pallets/aura-ext/src/signature_verifier.rs new file mode 100644 index 0000000000000..568b849010e61 --- /dev/null +++ b/cumulus/pallets/aura-ext/src/signature_verifier.rs @@ -0,0 +1,96 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. +// SPDX-License-Identifier: Apache-2.0 + +//! V3 scheduling signature verifier backed by parachain Aura authorities. +//! +//! Implements [`VerifySchedulingSignature`] for parachains running Aura: reads the relay +//! slot off the BABE pre-digest of the `internal_scheduling_parent` header, converts it +//! to the parachain slot, looks up the eligible Aura author from this pallet's cached +//! authority set, and verifies the 64-byte signature in [`SignedSchedulingInfo`] over the +//! encoded [`SchedulingInfoPayload`]. + +use crate::{Authorities, Config}; +use codec::{Decode, Encode}; +use cumulus_primitives_core::{ + relay_chain::Header as RelayChainHeader, SignedSchedulingInfo, VerifySchedulingSignature, +}; +use sp_application_crypto::RuntimeAppPublic; +use sp_consensus_aura::Slot; +use sp_consensus_babe::digests::CompatibleDigestItem as BabeDigestItem; + +/// Polkadot/Kusama relay chain slot duration in milliseconds. +const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000; + +/// Verifier for V3 [`SignedSchedulingInfo`] against parachain Aura authorities. +/// +/// Wired by the parachain runtime as +/// `type SchedulingSignatureVerifier = AuraSchedulingVerifier;` on +/// [`cumulus_pallet_parachain_system::Config`]. +/// +/// `T` is the runtime; the Aura crypto is derived from +/// [`pallet_aura::Config::AuthorityId`] (typically `sr25519` or `ed25519`). The +/// signature blob in [`SignedSchedulingInfo`] is decoded into +/// `::Signature` and verified with the +/// authority's own `verify` method, matching the existing Aura seal verification path. +pub struct AuraSchedulingVerifier(core::marker::PhantomData); + +impl VerifySchedulingSignature for AuraSchedulingVerifier +where + T: Config, + T: pallet_timestamp::Config, +{ + const V3_SCHEDULING_ENABLED: bool = true; + + fn verify( + signed_info: &SignedSchedulingInfo, + internal_scheduling_parent_header: &RelayChainHeader, + ) -> bool { + // 1. Decode the relay slot from the BABE pre-digest of the + // internal_scheduling_parent header. The eligible parachain author is + // determined by this slot. + let relay_slot: Slot = match internal_scheduling_parent_header + .digest + .logs() + .iter() + .find_map(|log| BabeDigestItem::as_babe_pre_digest(log)) + { + Some(pre_digest) => pre_digest.slot(), + None => return false, + }; + + // 2. Convert relay slot to parachain slot. Both slot durations are in milliseconds; the + // relay slot duration is fixed at 6s and the para slot duration is read from + // pallet-aura. + let para_slot_duration: u64 = + match TryInto::::try_into(pallet_aura::Pallet::::slot_duration()) { + Ok(d) if d > 0 => d, + _ => return false, + }; + let para_slot: u64 = u64::from(relay_slot) + .saturating_mul(RELAY_CHAIN_SLOT_DURATION_MILLIS) + .checked_div(para_slot_duration) + .unwrap_or(0); + + // 3. Look up the eligible Aura author. Use the cached authority set rather than + // `pallet_aura::Authorities` because aura-ext's cache is captured at on_initialize for + // verification of the current PoV. + let authorities = Authorities::::get(); + if authorities.is_empty() { + return false; + } + let author_idx = (para_slot % authorities.len() as u64) as usize; + let author = &authorities[author_idx]; + + // 4. Decode the 64-byte signature blob as the authority's expected signature type and + // verify over the encoded SchedulingInfoPayload. + let signature = match ::Signature::decode( + &mut &signed_info.signature[..], + ) { + Ok(sig) => sig, + Err(_) => return false, + }; + + author.verify(&signed_info.payload.encode(), &signature) + } +} diff --git a/cumulus/pallets/aura-ext/src/test.rs b/cumulus/pallets/aura-ext/src/test.rs index 9ec6d91ced03f..a3a85830fc016 100644 --- a/cumulus/pallets/aura-ext/src/test.rs +++ b/cumulus/pallets/aura-ext/src/test.rs @@ -522,3 +522,184 @@ fn block_executor_does_not_influence_proof_size_recordings() { BlockExecutor::::execute_verified_block(block); }); } + +// ========================================================================= +// AuraSchedulingVerifier tests +// ========================================================================= + +mod signature_verifier_tests { + use super::*; + use crate::{signature_verifier::AuraSchedulingVerifier, Authorities}; + use codec::Encode; + use cumulus_primitives_core::{ + relay_chain::Header as RelayHeader, CoreSelector, SchedulingInfoPayload, + SignedSchedulingInfo, VerifySchedulingSignature, + }; + use sp_consensus_babe::{ + digests::{PreDigest, SecondaryPlainPreDigest}, + BABE_ENGINE_ID, + }; + use sp_runtime::generic::{Digest, DigestItem}; + + fn header_with_relay_slot(slot: u64) -> RelayHeader { + let pre_digest = PreDigest::SecondaryPlain(SecondaryPlainPreDigest { + authority_index: 0, + slot: Slot::from(slot), + }); + let digest = + Digest { logs: vec![DigestItem::PreRuntime(BABE_ENGINE_ID, pre_digest.encode())] }; + RelayHeader::new(1u32, H256::zero(), H256::zero(), H256::zero(), digest) + } + + fn put_authorities(authorities: Vec) { + let bounded: BoundedVec<_, _> = BoundedVec::truncate_from(authorities); + Authorities::::put(bounded); + } + + fn signed_info_for( + signer: sp_keyring::Sr25519Keyring, + core_selector: CoreSelector, + internal_sp: H256, + ) -> SignedSchedulingInfo { + let payload = SchedulingInfoPayload::new(core_selector, 0, Default::default(), internal_sp); + let sig = signer.sign(&payload.encode()); + SignedSchedulingInfo { payload, signature: sig.0 } + } + + #[test] + fn verifies_valid_signature_by_eligible_author() { + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + // Two authorities, Alice at index 0 and Bob at index 1. + put_authorities(vec![Alice.public().into(), Bob.public().into()]); + + // IP slot 4 → para slot 4 → author = authorities[4 % 2] = Alice. + let header = header_with_relay_slot(4); + let internal_sp = H256::repeat_byte(0xAB); + let signed = signed_info_for(Alice, CoreSelector(7), internal_sp); + + assert!(AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn rejects_signature_by_wrong_author() { + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + // Alice at index 0 is the eligible author for IP slot 4 (4 % 2 == 0). + // Bob signs instead → verification must fail. + put_authorities(vec![Alice.public().into(), Bob.public().into()]); + + let header = header_with_relay_slot(4); + let internal_sp = H256::repeat_byte(0xAB); + let signed = signed_info_for(Bob, CoreSelector(0), internal_sp); + + assert!(!AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn rejects_signature_when_internal_scheduling_parent_differs() { + // Tampering with `signed_info.payload.internal_scheduling_parent` after signing + // must invalidate the signature: the payload binding is part of the signed bytes. + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + put_authorities(vec![Alice.public().into(), Bob.public().into()]); + + let header = header_with_relay_slot(4); + let signed_internal_sp = H256::repeat_byte(0xAB); + let mut signed = signed_info_for(Alice, CoreSelector(0), signed_internal_sp); + // Repoint the payload at a different ISP without re-signing. + signed.payload.internal_scheduling_parent = H256::repeat_byte(0xCD); + + assert!(!AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn rejects_signature_when_core_selector_differs() { + // Signing payload with CoreSelector(1), passing in CoreSelector(2) → reject. + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + put_authorities(vec![Alice.public().into(), Bob.public().into()]); + + let header = header_with_relay_slot(4); + let internal_sp = H256::repeat_byte(0xAB); + let mut signed = signed_info_for(Alice, CoreSelector(1), internal_sp); + // Tamper with the core selector field but keep the (now stale) signature. + signed.payload.core_selector = CoreSelector(2); + + assert!(!AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn rejects_when_header_has_no_babe_pre_digest() { + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + put_authorities(vec![Alice.public().into()]); + + // Header with no digest items → no relay slot → reject. + let header = + RelayHeader::new(1u32, H256::zero(), H256::zero(), H256::zero(), Digest::default()); + let internal_sp = H256::repeat_byte(0xAB); + let signed = signed_info_for(Alice, CoreSelector(0), internal_sp); + + assert!(!AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn rejects_when_authorities_empty() { + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + put_authorities(vec![]); + + let header = header_with_relay_slot(4); + let internal_sp = H256::repeat_byte(0xAB); + let signed = signed_info_for(Alice, CoreSelector(0), internal_sp); + + assert!(!AuraSchedulingVerifier::::verify(&signed, &header)); + }); + } + + #[test] + fn slot_lookup_uses_the_header_passed_in_not_some_other_source() { + // Regression test for the descendant-header bug. + // + // The verifier MUST derive the IP slot from the BABE pre-digest of the header + // it receives. It must NOT silently fall back to some other source (a stored + // slot, a freshest-tip header, etc.). + // + // This test wires two IP headers with slots picking *different* Aura authors, + // then proves that swapping which header is passed in flips which author the + // verifier accepts. + TestSlotDuration::set_slot_duration(6000); + new_test_ext(0).execute_with(|| { + // 2 authorities → author = authorities[ip_slot % 2]. + // Slot 4 → Alice (index 0). Slot 5 → Bob (index 1). + put_authorities(vec![Alice.public().into(), Bob.public().into()]); + + let isp_header_alice_slot = header_with_relay_slot(4); + let isp_header_bob_slot = header_with_relay_slot(5); + let internal_sp = H256::repeat_byte(0xAB); + + // Alice signs (she is the eligible author at slot 4). + let signed = signed_info_for(Alice, CoreSelector(0), internal_sp); + + // Passing the Alice-aligned IP header → verifier looks up at slot 4 → + // expects Alice → accepts. + assert!( + AuraSchedulingVerifier::::verify(&signed, &isp_header_alice_slot), + "verifier must accept Alice's signature when given an ISP header at slot 4", + ); + + // Passing a different IP header at a slot whose author is Bob → rejects. + assert!( + !AuraSchedulingVerifier::::verify(&signed, &isp_header_bob_slot), + "verifier must reject Alice's signature when given an ISP header at \ + a slot whose eligible author is Bob", + ); + }); + } +} diff --git a/cumulus/pallets/parachain-system/src/validate_block/implementation.rs b/cumulus/pallets/parachain-system/src/validate_block/implementation.rs index 788e4cd26960d..f02981c2582f0 100644 --- a/cumulus/pallets/parachain-system/src/validate_block/implementation.rs +++ b/cumulus/pallets/parachain-system/src/validate_block/implementation.rs @@ -136,7 +136,9 @@ where sp_io::transaction_index::host_renew.replace_implementation(host_transaction_index_renew), ); - // V3 scheduling validation. + // V3 scheduling validation (chain-shape only). Signature verification of + // `signed_scheduling_info` happens here at the call site so the verifier wiring + // stays out of the pure shape check. let validated_scheduling = scheduling::validate_v3_scheduling( PSC::SchedulingSignatureVerifier::V3_SCHEDULING_ENABLED, &extension.0, @@ -144,8 +146,20 @@ where PSC::RelayParentOffset::get(), ); if let Some(result) = validated_scheduling { + // Verify only on resubmission. For initial submissions `signed_scheduling_info` is + // optional and core selection comes from the block's UMP signals, so there is + // nothing to verify here. if result.is_resubmission { - panic!("Resubmission not yet supported; reject candidate."); + if let Some(proof) = block_data.scheduling_proof() { + if let Some(signed_info) = proof.signed_scheduling_info.as_ref() { + if !PSC::SchedulingSignatureVerifier::verify( + signed_info, + &proof.internal_scheduling_parent_header, + ) { + panic!("V3 scheduling validation failed: invalid signed_scheduling_info"); + } + } + } } } diff --git a/cumulus/pallets/parachain-system/src/validate_block/scheduling.rs b/cumulus/pallets/parachain-system/src/validate_block/scheduling.rs index 16bfd15fbd75d..70c86715d30b9 100644 --- a/cumulus/pallets/parachain-system/src/validate_block/scheduling.rs +++ b/cumulus/pallets/parachain-system/src/validate_block/scheduling.rs @@ -30,6 +30,12 @@ pub enum SchedulingValidationError { /// When relay_parent != internal_scheduling_parent, the resubmitting collator must /// sign the core selection to prove slot eligibility. MissingSignedSchedulingInfo, + /// `internal_scheduling_parent_header` does not hash to the internal scheduling + /// parent derived from the header chain (or `scheduling_parent` when the chain + /// is empty). The verifier reads the BABE pre-digest out of this header for the + /// slot lookup; without the linkage check a collator could attach an unrelated + /// header pointing the verifier at an arbitrary slot. + InternalSchedulingParentHeaderMismatch, } /// Result of successful scheduling validation. @@ -43,8 +49,14 @@ pub struct SchedulingValidationResult { /// Validate V3 scheduling based on runtime config and candidate extension. /// -/// Returns `None` for V1/V2 candidates, `Some(result)` for valid V3. -/// Panics on config/extension mismatches or validation failures. +/// Returns `None` for V1/V2 candidates, `Some(result)` for valid V3. Panics on +/// config/extension mismatches or chain-shape validation failures. +/// +/// This function only validates the *shape* of the scheduling proof (header chain +/// linkage, relay-parent position, presence of `signed_scheduling_info` when +/// required). Signature verification on `signed_scheduling_info` is the caller's +/// responsibility — see `validate_block` for the call site that invokes +/// `PSC::SchedulingSignatureVerifier` using the returned `internal_scheduling_parent`. pub fn validate_v3_scheduling( v3_enabled: bool, extension: &Option, @@ -158,10 +170,18 @@ pub fn check_scheduling( // Signature verification is done separately after slot/authority lookup } // Note: For initial submission (relay_parent == internal_scheduling_parent), - // signed_scheduling_info is optional. If absent, core selection comes from the - // block's UMP signals. If present, signature verification is still performed. - // Collators should refuse to acknowledge blocks with invalid scheduling info, - // so providing signed_scheduling_info is not necessary but is legal. + // signed_scheduling_info is optional and core selection comes from the block's + // UMP signals. Verification (when applicable) happens at the call site, not here. + + // 6. The internal_scheduling_parent_header carried in the proof must hash to the + // internal_scheduling_parent we just derived. The verifier reads the BABE pre-digest + // out of this header for the slot lookup; without the linkage check a collator + // could attach an unrelated header pointing the verifier at an arbitrary slot. + // This also pins the ISP header for initial submissions, where the derived ISP equals + // `relay_parent` from the extension. + if scheduling_proof.internal_scheduling_parent_header.hash() != internal_scheduling_parent { + return Err(SchedulingValidationError::InternalSchedulingParentHeaderMismatch); + } Ok(SchedulingValidationResult { internal_scheduling_parent, @@ -641,4 +661,30 @@ mod tests { // Should panic because resubmission requires signed_scheduling_info validate_v3_scheduling(true, &Some(ext), Some(&proof), 3); } + + #[test] + fn empty_chain_with_signed_info_is_legal() { + // An empty chain plus `signed_scheduling_info` is structurally legal: it + // represents an initial submission with an explicit core selector. Signature + // verification (when wired) happens at the call site, not here. + let (_, isp_header) = make_header_chain(0); + let scheduling_parent = isp_header.hash(); + let relay_parent = scheduling_parent; + let proof = SchedulingProof { + header_chain: vec![], + internal_scheduling_parent_header: isp_header.clone(), + signed_scheduling_info: Some(SignedSchedulingInfo { + payload: SchedulingInfoPayload::new( + CoreSelector(0), + 0, + Default::default(), + scheduling_parent, + ), + signature: dummy_signature(), + }), + }; + let result = check_scheduling(&proof, relay_parent, scheduling_parent, 0); + assert!(result.is_ok()); + assert!(!result.unwrap().is_resubmission); + } } diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 4936559284e6b..33a694cef72c8 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -648,6 +648,7 @@ where CIDP: CreateInherentDataProviders + 'static, CIDP::InherentDataProviders: Send, CHP: cumulus_client_consensus_common::ValidationCodeHashProvider + + Clone + Send + Sync + 'static, diff --git a/cumulus/primitives/core/src/scheduling.rs b/cumulus/primitives/core/src/scheduling.rs index 74859a6ad8625..305bac4c316e2 100644 --- a/cumulus/primitives/core/src/scheduling.rs +++ b/cumulus/primitives/core/src/scheduling.rs @@ -15,8 +15,10 @@ //! The `relay_parent` stays the same since the execution context hasn't changed. //! //! For resubmission, `signed_scheduling_info` must be provided. The resubmitting -//! collator signs the core selection, proving they are the eligible author for the -//! slot derived from the `internal_scheduling_parent`. +//! collator signs the core selection, proving they are the eligible parachain author +//! for the slot derived from `internal_scheduling_parent`. The `internal_scheduling_parent` +//! is also bundled into the signed payload so the signature binds to this specific +//! scheduling chain and is not reusable on a different one. use alloc::vec::Vec; use codec::{Decode, Encode}; diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index b523babed1848..ebaa6a63bda31 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -90,7 +90,7 @@ use error::{Error, Result}; use futures::{channel::oneshot, future::FutureExt, select}; use polkadot_node_primitives::{ AvailableData, Collation, CollationGenerationConfig, CollationSecondedSignal, PoV, - SubmitCollationParams, + SegmentCollation, SubmitCollationParams, SubmitSegmentParams, }; use polkadot_node_subsystem::{ messages::{CollationGenerationMessage, CollatorProtocolMessage, RuntimeApiMessage}, @@ -203,6 +203,15 @@ impl CollationGenerationSubsystem { false }, + Ok(FromOrchestra::Communication { + msg: CollationGenerationMessage::SubmitSegment(params), + }) => { + if let Err(err) = self.handle_submit_segment(params, ctx).await { + gum::error!(target: LOG_TARGET, ?err, "Failed to submit segment"); + } + + false + }, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => false, Err(err) => { gum::error!( @@ -276,6 +285,54 @@ impl CollationGenerationSubsystem { Ok(()) } + // TODO: once the V4 collator-protocol lands, distribute the receipts for the whole segment as + // a single unit instead of one collation at a time, and drop the length-1 restriction. + async fn handle_submit_segment( + &mut self, + params: SubmitSegmentParams, + ctx: &mut Context, + ) -> Result<()> { + let SubmitSegmentParams { scheduling_parent, core_index, mut collations } = params; + + if collations.is_empty() { + gum::warn!(target: LOG_TARGET, "received empty segment submission"); + return Ok(()); + } + + if collations.len() > 1 { + gum::warn!( + target: LOG_TARGET, + len = collations.len(), + "multi-collation segment submission not yet supported; ignoring", + ); + return Ok(()); + } + + let SegmentCollation { + relay_parent, + collation, + validation_code_hash, + result_sender, + session_index, + validation_data, + } = collations.pop().expect("len == 1; qed"); + + self.handle_submit_collation( + SubmitCollationParams { + relay_parent, + collation, + validation_code_hash, + result_sender, + core_index, + scheduling_parent, + session_index, + validation_data, + }, + ctx, + ) + .await + } + async fn handle_new_activation( &mut self, maybe_activated: Option, diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index 3e34c3b40332c..44669c17157e6 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -553,6 +553,47 @@ pub struct SubmitCollationParams { pub validation_data: PersistedValidationData, } +/// A single collation in a segment submitted via `CollationGenerationMessage::SubmitSegment`. +#[derive(Debug)] +pub struct SegmentCollation { + /// The relay-parent the collation is built against. + pub relay_parent: Hash, + /// The collation itself (PoV and commitments). + pub collation: Collation, + /// The hash of the validation code the collation was created against. + pub validation_code_hash: ValidationCodeHash, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// See [`SubmitCollationParams::result_sender`] for the same caveats. + pub result_sender: Option>, + /// The session index of the relay parent. Goes into the candidate descriptor. + /// Must be provided by the caller because the relay parent's state may be pruned. + pub session_index: SessionIndex, + /// The persisted validation data for this collation. The `parent_head` field must be set + /// to the correct parent head-data for the parablock being submitted. + pub validation_data: PersistedValidationData, +} + +/// Parameters for `CollationGenerationMessage::SubmitSegment`. +/// +/// Submits multiple collations that share a common scheduling parent and target core. Each +/// [`SegmentCollation`] in `collations` carries the fields that may differ between blocks of the +/// segment (relay parent, collation payload, validation data, etc.). +#[derive(Debug)] +pub struct SubmitSegmentParams { + /// The scheduling parent for V3 candidate descriptors, shared by all collations in the + /// segment. If set, every candidate descriptor will use this as the scheduling parent + /// (creating V3 descriptors). If `None`, each collation's `relay_parent` is used (V2 + /// descriptors). + /// + /// WARNING: Should only be set if the `CandidateReceiptV3` node feature is set. + pub scheduling_parent: Option, + /// The core index on which the resulting candidates should be backed. + pub core_index: CoreIndex, + /// The collations in this segment, in the order they should be submitted. + pub collations: Vec, +} + /// This is the data we keep available for each candidate included in the relay chain. #[derive(Clone, Encode, Decode, PartialEq, Eq, Debug)] pub struct AvailableData { diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 7117f16999dc3..74821703b0799 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -39,7 +39,7 @@ use polkadot_node_primitives::{ AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig, CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV, SignedDisputeStatement, SignedFullStatement, SignedFullStatementWithPVD, SubmitCollationParams, - ValidationResult, + SubmitSegmentParams, ValidationResult, }; use polkadot_primitives::{ self, @@ -989,6 +989,12 @@ pub enum CollationGenerationMessage { /// /// If sent before `Initialize`, this will be ignored. SubmitCollation(SubmitCollationParams), + /// Submit a segment of collations that share a scheduling parent and target core. Each + /// collation is packaged into a signed [`CommittedCandidateReceipt`] and distributed to + /// validators, in the order they appear in [`SubmitSegmentParams::collations`]. + /// + /// If sent before `Initialize`, this will be ignored. + SubmitSegment(SubmitSegmentParams), } /// The result type of [`ApprovalVotingMessage::ImportAssignment`] request.