Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.

use super::CollatorMessage;
use super::{CollatorMessage, CollatorResubmitSegment, SegmentKind};
use crate::{
collator::{self as collator_util, BuildBlockAndImportParams, Collator, SlotClaim},
collators::{
Expand Down Expand Up @@ -108,8 +108,10 @@ pub struct BuilderTaskParams<
pub proposer: Proposer,
/// The generic collator service used to plug into this consensus engine.
pub collator_service: CS,
/// Channel to send built blocks to the collation task.
/// Channel for V2 single-bundle collations.
pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
/// Channel for V3 segment collations.
pub resubmit_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorResubmitSegment<Block>>,
/// Slot duration of the relay chain.
pub relay_chain_slot_duration: Duration,
/// Offset all time operations by this duration.
Expand Down Expand Up @@ -185,6 +187,7 @@ where
proposer,
collator_service,
collator_sender,
resubmit_sender,
code_hash_provider,
relay_chain_slot_duration,
para_backend,
Expand Down Expand Up @@ -274,7 +277,7 @@ where
.await
.unwrap_or(0);
}
let Ok(Some(relay_parent_data)) = offset_relay_parent_find_descendants(
let Ok(Some(mut relay_parent_data)) = offset_relay_parent_find_descendants(
&mut relay_chain_data_cache,
scheduling_parent_header.clone(),
relay_parent_offset,
Expand Down Expand Up @@ -484,12 +487,43 @@ where
"Core configuration",
);

// Build the V3 scheduling proof once per slot — it's shared by every candidate in the
// segment. Consumes `relay_parent_data.descendants`, so the inherent created later
// downstream sees an empty list (which V3 verification expects).
let scheduling_proof = if v3_enabled {
let descendants = relay_parent_data.take_descendants();
let header_chain: Vec<_> = descendants.into_iter().rev().collect();
let scheduling_parent =
header_chain.first().map(|header| header.hash()).unwrap_or(relay_parent_hash);

tracing::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent_hash,
?scheduling_parent,
header_chain_len = header_chain.len(),
"Building V3 collation with scheduling proof",
);

Some(SchedulingProof {
header_chain,
internal_scheduling_parent_header: relay_parent_header.clone(),
// TODO: sign and populate when resubmission lands. The relay-chain verifier
// rejects `ResubmitOnly` / mixed segments whose proof lacks
// `signed_scheduling_info`.
signed_scheduling_info: None,
})
} else {
None
};

let mut pov_parent_header = initial_parent_header;
let mut pov_parent_hash = initial_parent_hash;
let block_time = relay_chain_slot_duration / number_of_blocks;

for blocks_per_core in blocks_per_cores {
for (core_iter_index, blocks_per_core) in blocks_per_cores.into_iter().enumerate() {
let time_for_core = slot_time.time_left() / cores.cores_left();
let is_first_core = core_iter_index == 0;
let this_core_index = cores.core_index();

match build_collation_for_core(BuildCollationParams {
pov_parent_header,
Expand All @@ -502,10 +536,11 @@ where
code_hash_provider: &code_hash_provider,
slot_claim: &slot_claim,
collator_sender: &collator_sender,
resubmit_sender: &resubmit_sender,
collator: &mut collator,
allowed_pov_size,
core_info: cores.core_info(),
core_index: cores.core_index(),
core_index: this_core_index,
block_time,
blocks_per_core,
time_for_core,
Expand All @@ -518,16 +553,33 @@ where
relay_slot,
para_slot: para_slot.slot,
para_client: &*para_client,
v3_enabled,
scheduling_proof: scheduling_proof.clone(),
})
.await
{
Ok(Some(header)) => {
pov_parent_header = header;
pov_parent_hash = pov_parent_header.hash();
},
// Let's wait for the next slot
Ok(None) => break,
Ok(None) => {
// First-core failed to build a fresh bundle: still ship the held prior
// unincluded segment via `ResubmitOnly`.
//
// TODO: seding to a certain core can apply in a custom way too, and
// other strategies for resubmitting a segment should be available - e.g.
// split segment between cores equally, or attempt grouping unincluded
// blocks based on the core index resulting from their `core_selector`
// and `claim_queue_offset` combination, at the `scheduling_parent`).
if is_first_core {
if let Some(proof) = scheduling_proof.clone() {
let _ = resubmit_sender.unbounded_send(CollatorResubmitSegment {
scheduling_proof: proof,
kind: SegmentKind::ResubmitOnly { core_index: this_core_index },
});
}
}
break;
},
Err(()) => return,
}

Expand Down Expand Up @@ -562,6 +614,7 @@ struct BuildCollationParams<
code_hash_provider: &'a CHP,
slot_claim: &'a SlotClaim<P::Public>,
collator_sender: &'a sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
resubmit_sender: &'a sc_utils::mpsc::TracingUnboundedSender<CollatorResubmitSegment<Block>>,
collator: &'a mut Collator<Block, P, BI, CIDP, RelayClient, Proposer, CS>,
allowed_pov_size: usize,
core_info: CoreInfo,
Expand All @@ -578,7 +631,8 @@ struct BuildCollationParams<
relay_slot: cumulus_primitives_aura::Slot,
para_slot: cumulus_primitives_aura::Slot,
para_client: &'a Client,
v3_enabled: bool,
/// V3 scheduling proof, built once per slot. `Some` iff V3 is enabled.
scheduling_proof: Option<SchedulingProof>,
}

/// Build a collation for one core.
Expand Down Expand Up @@ -606,6 +660,7 @@ async fn build_collation_for_core<
code_hash_provider,
slot_claim,
collator_sender,
resubmit_sender,
collator,
allowed_pov_size,
core_info,
Expand All @@ -615,13 +670,13 @@ async fn build_collation_for_core<
time_for_core: slot_time_for_core,
is_last_core_in_parachain_slot,
collator_peer_id,
mut relay_parent_data,
relay_parent_data,
total_number_of_blocks,
included_header_hash,
relay_slot,
para_slot,
para_client,
v3_enabled,
scheduling_proof,
}: BuildCollationParams<'_, Block, P, RelayClient, BI, CIDP, Proposer, CS, CHP, Client>,
) -> Result<Option<Block::Header>, ()>
where
Expand Down Expand Up @@ -649,34 +704,6 @@ where
max_pov_size,
};

// Check if V3 scheduling is enabled and build scheduling proof if so.
let mut scheduling_proof = None;
if v3_enabled {
// The relay parent descendants are only needed for v2.
let descendants = relay_parent_data.take_descendants();
// The descendants are ordered from oldest to newest, so we need to reverse them.
let header_chain: Vec<_> = descendants.into_iter().rev().collect();
let scheduling_parent =
header_chain.first().map(|header| header.hash()).unwrap_or(relay_parent_hash);

tracing::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent_hash,
?scheduling_parent,
header_chain_len = header_chain.len(),
"Building V3 collation with scheduling proof",
);

scheduling_proof = Some(SchedulingProof {
header_chain,
// Initial submission: internal_scheduling_parent == relay_parent, so the
// internal scheduling parent header is the relay parent's header itself.
internal_scheduling_parent_header: relay_parent_header.clone(),
// Initial submission: no signature needed, core selection from UMP signals
signed_scheduling_info: None,
});
}

let Some(validation_code_hash) = code_hash_provider.code_hash_at(pov_parent_hash) else {
tracing::error!(
target: LOG_TARGET,
Expand Down Expand Up @@ -902,17 +929,41 @@ where
"Sending out PoV"
);

if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
relay_parent: relay_parent_hash,
scheduling_proof,
parent_header: pov_parent_header.clone(),
blocks,
proof,
validation_code_hash,
core_index,
validation_data,
}) {
tracing::error!(target: LOG_TARGET, ?err, "Unable to send block to collation task.");
// `scheduling_proof` is `Some` iff `v3_enabled`; routes V3 bundles to `resubmit_sender`,
// V2 bundles (no proof) to `collator_sender`.
let send_ok = if let Some(scheduling_proof) = scheduling_proof {
resubmit_sender
.unbounded_send(CollatorResubmitSegment {
scheduling_proof,
kind: SegmentKind::WithBundle {
bundle: CollatorMessage {
relay_parent: relay_parent_hash,
parent_header: pov_parent_header.clone(),
blocks,
proof,
validation_code_hash,
validation_data,
core_index,
},
},
})
.is_ok()
} else {
collator_sender
.unbounded_send(CollatorMessage {
core_index,
relay_parent: relay_parent_hash,
parent_header: pov_parent_header.clone(),
blocks,
proof,
validation_code_hash,
validation_data,
})
.is_ok()
};

if !send_ok {
tracing::error!(target: LOG_TARGET, "Unable to send block to collation task.");
Err(())
} else {
// Now let's sleep for the rest of the core.
Expand Down
Loading
Loading