diff --git a/Cargo.lock b/Cargo.lock
index ccee20011eab0..6f85bb6fed38e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5146,6 +5146,42 @@ dependencies = [
"sp-trie 29.0.0",
]
+[[package]]
+name = "cumulus-client-storage-chain-sync"
+version = "0.7.0"
+dependencies = [
+ "async-trait",
+ "cid 0.11.3",
+ "futures",
+ "futures-timer",
+ "log",
+ "parity-scale-codec",
+ "prost 0.12.6",
+ "rstest",
+ "sc-client-api 28.0.0",
+ "sc-client-db",
+ "sc-consensus",
+ "sc-network 0.34.0",
+ "sc-network-sync",
+ "sp-api 26.0.0",
+ "sp-blockchain 28.0.0",
+ "sp-consensus 0.32.0",
+ "sp-core 28.0.0",
+ "sp-crypto-hashing 0.1.0",
+ "sp-externalities 0.25.0",
+ "sp-runtime 31.0.1",
+ "sp-state-machine 0.35.0",
+ "sp-tracing 16.0.0",
+ "sp-transaction-storage-proof",
+ "sp-trie 29.0.0",
+ "sp-version 29.0.0",
+ "thiserror 1.0.65",
+ "tokio",
+ "tracing",
+ "tracing-log",
+ "tracing-subscriber 0.3.19",
+]
+
[[package]]
name = "cumulus-pallet-aura-ext"
version = "0.7.0"
@@ -5798,13 +5834,16 @@ name = "cumulus-zombienet-sdk-tests"
version = "0.1.0"
dependencies = [
"anyhow",
+ "cid 0.11.3",
"cumulus-primitives-core",
"cumulus-test-runtime",
"cumulus-zombienet-sdk-helpers",
"env_logger 0.11.3",
"frame-support",
"futures",
+ "hex",
"log",
+ "multihash 0.19.5",
"parity-scale-codec",
"parity-wasm",
"polkadot-primitives",
@@ -5829,6 +5868,7 @@ dependencies = [
"subxt 0.44.2",
"subxt 0.50.0",
"subxt-signer 0.50.0",
+ "thiserror 1.0.65",
"tokio",
"verifiable",
"zombienet-configuration",
@@ -17063,6 +17103,7 @@ dependencies = [
"cumulus-client-consensus-relay-chain",
"cumulus-client-parachain-inherent",
"cumulus-client-service",
+ "cumulus-client-storage-chain-sync",
"cumulus-primitives-aura",
"cumulus-primitives-core",
"cumulus-relay-chain-interface",
@@ -17447,6 +17488,7 @@ dependencies = [
"cumulus-client-pov-recovery",
"cumulus-client-proof-size-recording",
"cumulus-client-service",
+ "cumulus-client-storage-chain-sync",
"cumulus-pallet-aura-ext",
"cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
@@ -25761,6 +25803,7 @@ dependencies = [
"scale-info",
"sp-api 26.0.0",
"sp-core 28.0.0",
+ "sp-crypto-hashing 0.1.0",
"sp-inherents 26.0.0",
"sp-runtime 31.0.1",
"sp-trie 29.0.0",
diff --git a/Cargo.toml b/Cargo.toml
index 878bd38eeac40..717c9fdb279a1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -77,6 +77,7 @@ members = [
"cumulus/client/relay-chain-rpc-interface",
"cumulus/client/relay-chain-streams",
"cumulus/client/service",
+ "cumulus/client/storage-chain-sync",
"cumulus/pallets/ah-ops",
"cumulus/pallets/aura-ext",
"cumulus/pallets/collator-selection",
@@ -771,6 +772,7 @@ cumulus-client-parachain-inherent = { path = "cumulus/client/parachain-inherent"
cumulus-client-pov-recovery = { path = "cumulus/client/pov-recovery", default-features = false }
cumulus-client-proof-size-recording = { path = "cumulus/client/proof-size-recording", default-features = false }
cumulus-client-service = { path = "cumulus/client/service", default-features = false }
+cumulus-client-storage-chain-sync = { path = "cumulus/client/storage-chain-sync", default-features = false }
cumulus-pallet-aura-ext = { path = "cumulus/pallets/aura-ext", default-features = false }
cumulus-pallet-parachain-system = { path = "cumulus/pallets/parachain-system", default-features = false }
cumulus-pallet-parachain-system-proc-macro = { path = "cumulus/pallets/parachain-system/proc-macro", default-features = false }
diff --git a/cumulus/client/storage-chain-sync/Cargo.toml b/cumulus/client/storage-chain-sync/Cargo.toml
new file mode 100644
index 0000000000000..7d8a1bbac41bc
--- /dev/null
+++ b/cumulus/client/storage-chain-sync/Cargo.toml
@@ -0,0 +1,53 @@
+[package]
+name = "cumulus-client-storage-chain-sync"
+description = "Storage-chain bitswap peer rotation and indexed transaction fetching service"
+version = "0.7.0"
+authors.workspace = true
+edition.workspace = true
+license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
+homepage.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+async-trait = { workspace = true }
+cid = { workspace = true }
+codec = { workspace = true, default-features = true }
+futures = { workspace = true }
+futures-timer = { workspace = true }
+log = { workspace = true, default-features = true }
+sc-client-api = { workspace = true, default-features = true }
+sc-client-db = { workspace = true, default-features = true }
+sc-consensus = { workspace = true, default-features = true }
+sc-network = { workspace = true, default-features = true }
+sc-network-sync = { workspace = true, default-features = true }
+sp-api = { workspace = true, default-features = true }
+sp-blockchain = { workspace = true, default-features = true }
+sp-consensus = { workspace = true, default-features = true }
+sp-core = { workspace = true, default-features = true }
+sp-externalities = { workspace = true, default-features = true }
+sp-runtime = { workspace = true, default-features = true }
+sp-state-machine = { workspace = true, default-features = true }
+sp-transaction-storage-proof = { workspace = true, default-features = true }
+sp-trie = { workspace = true, default-features = true }
+thiserror = { workspace = true }
+
+[dev-dependencies]
+prost = { workspace = true }
+rstest = { workspace = true }
+sc-network = { workspace = true, default-features = true, features = ["test-helpers"] }
+sp-crypto-hashing = { workspace = true, default-features = true }
+sp-tracing = { workspace = true, default-features = true }
+sp-version = { workspace = true, default-features = true }
+tokio = { workspace = true, default-features = true, features = ["macros", "rt"] }
+tracing = { workspace = true, default-features = true }
+tracing-log = { workspace = true, default-features = true }
+tracing-subscriber = { workspace = true, default-features = true }
+
+[features]
+# Exposes `StorageChainBlockImport::intercept_gap_sync_for_test` so downstream test
+# crates can exercise the gap-sync dispatch path while it remains gated in production
+# behind `should_intercept`.
+test-helpers = []
diff --git a/cumulus/client/storage-chain-sync/src/fetcher.rs b/cumulus/client/storage-chain-sync/src/fetcher.rs
new file mode 100644
index 0000000000000..7c1e2f73a02ab
--- /dev/null
+++ b/cumulus/client/storage-chain-sync/src/fetcher.rs
@@ -0,0 +1,217 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Cumulus is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see .
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Bitswap-based fetcher for indexed-transaction blobs. Owns the late-bound network and
+//! peer-source handles; rotates across connected peers per batch.
+
+use async_trait::async_trait;
+use cid::{multihash::Multihash, Cid};
+use futures::channel::oneshot;
+use sc_network::{
+ bitswap::{request_bitswap_blocks, FetchOutcome, MAX_WANTED_BLOCKS, RAW_CODEC},
+ NetworkRequest, PeerId,
+};
+use sc_network_sync::SyncingService;
+use sp_runtime::traits::Block as BlockT;
+use sp_transaction_storage_proof::{ContentHash, HashingAlgorithm};
+use std::{
+ collections::HashMap,
+ sync::{Arc, OnceLock},
+ time::Duration,
+};
+
+const LOG_TARGET: &str = "storage-chain-fetcher";
+const BITSWAP_PER_PEER_TIMEOUT: Duration = Duration::from_secs(5);
+const MAX_PEERS_PER_IMPORT: usize = 8;
+
+/// Source of currently-connected sync peer IDs. Abstracted so the fetcher can be unit-tested
+/// without spinning up a full `SyncingService`. The production blanket impl on
+/// `SyncingService` calls `peers_info()` and projects to the peer-id column.
+#[async_trait]
+pub trait BitswapPeerSource: Send + Sync {
+ async fn current_peers(&self) -> Result, oneshot::Canceled>;
+}
+
+#[async_trait]
+impl BitswapPeerSource for SyncingService {
+ async fn current_peers(&self) -> Result, oneshot::Canceled> {
+ Ok(self.peers_info().await?.into_iter().map(|(peer, _)| peer).collect())
+ }
+}
+
+/// Late-bound network request handle, populated by the omni-node after build_network.
+pub type NetworkHandle = Arc>>;
+/// Late-bound peer-source handle populated after `build_network` returns.
+pub type SyncingHandle = Arc>>;
+
+/// Infrastructure-level fetch failure.
+#[derive(Debug, thiserror::Error)]
+pub enum FetchError {
+ #[error("network handle not yet set; storage-chain blocks cannot be fetched before build_network completes")]
+ NetworkHandleUnset,
+ #[error("sync handle not yet set; storage-chain blocks cannot be fetched before build_network completes")]
+ SyncingHandleUnset,
+ #[error("failed to construct multihash for CID: {0}")]
+ Multihash(String),
+}
+
+/// Fetcher that resolves indexed-transaction hashes via bitswap.
+///
+/// Owns the late-bound network/sync handles plus the per-peer iteration policy. The block-import
+/// path holds one of these and calls [`Self::fetch_many`] for each batch of missing renew hashes.
+///
+/// Cloning is cheap: every field is an `Arc`-equivalent.
+pub struct IndexedTransactionFetcher {
+ network: NetworkHandle,
+ peer_source: SyncingHandle,
+ _phantom: std::marker::PhantomData,
+}
+
+impl Clone for IndexedTransactionFetcher {
+ fn clone(&self) -> Self {
+ Self {
+ network: self.network.clone(),
+ peer_source: self.peer_source.clone(),
+ _phantom: std::marker::PhantomData,
+ }
+ }
+}
+
+impl IndexedTransactionFetcher {
+ /// Build a new fetcher backed by the given late-bound handles.
+ pub fn new(network: NetworkHandle, peer_source: SyncingHandle) -> Self {
+ Self { network, peer_source, _phantom: std::marker::PhantomData }
+ }
+
+ /// Resolve a batch of indexed-transaction hashes via bitswap, rotating across up to
+ /// [`MAX_PEERS_PER_IMPORT`] peers. Returns only successfully fetched entries.
+ pub async fn fetch_many(
+ &self,
+ wants: &[(ContentHash, HashingAlgorithm)],
+ ) -> Result>, FetchError> {
+ if wants.is_empty() {
+ return Ok(HashMap::new());
+ }
+ let network = self.network.get().ok_or(FetchError::NetworkHandleUnset)?;
+ let peer_source = self.peer_source.get().ok_or(FetchError::SyncingHandleUnset)?;
+
+ let peers = match peer_source.current_peers().await {
+ Ok(peers) => peers,
+ Err(_) => {
+ log::warn!(target: LOG_TARGET, "current_peers() channel cancelled");
+ return Ok(HashMap::new());
+ },
+ };
+ if peers.is_empty() {
+ log::debug!(
+ target: LOG_TARGET,
+ "no connected sync peers, cannot fetch via bitswap yet",
+ );
+ return Ok(HashMap::new());
+ }
+
+ // Build per-want CIDs once; reuse across peers and chunks.
+ let cids: Vec<(ContentHash, Cid)> = wants
+ .iter()
+ .map(|(hash, algo)| {
+ let mh = Multihash::<64>::wrap(algo.multihash_code(), hash)
+ .map_err(|e| FetchError::Multihash(e.to_string()))?;
+ Ok::<_, FetchError>((*hash, Cid::new_v1(RAW_CODEC, mh)))
+ })
+ .collect::>()?;
+ let mut remaining = cids;
+ let mut acquired: HashMap> = HashMap::new();
+
+ for peer in peers.into_iter().take(MAX_PEERS_PER_IMPORT) {
+ if remaining.is_empty() {
+ break;
+ }
+ let from_peer = try_fetch_from_peer(network.as_ref(), peer, &remaining).await;
+ acquired.extend(from_peer);
+ remaining.retain(|(hash, _)| !acquired.contains_key(hash));
+ }
+
+ Ok(acquired)
+ }
+}
+
+/// Try every chunk of `wants` against a single peer in sequence. Returns whatever blocks the
+/// peer actually served. A timeout or per-chunk error aborts the remaining chunks for this peer
+/// and lets the caller move on to the next one.
+async fn try_fetch_from_peer(
+ network: &N,
+ peer: PeerId,
+ wants: &[(ContentHash, Cid)],
+) -> HashMap> {
+ let mut acquired: HashMap> = HashMap::new();
+ for chunk in wants.chunks(MAX_WANTED_BLOCKS) {
+ let cids: Vec = chunk.iter().map(|(_, cid)| *cid).collect();
+ match with_timeout(request_bitswap_blocks(network, peer, &cids), BITSWAP_PER_PEER_TIMEOUT)
+ .await
+ {
+ None => {
+ log::debug!(
+ target: LOG_TARGET,
+ "request_bitswap_blocks to {peer:?}: timeout (chunk size {})",
+ chunk.len(),
+ );
+ return acquired;
+ },
+ Some(Err(e)) => {
+ log::debug!(target: LOG_TARGET, "request_bitswap_blocks to {peer:?}: {e:?}");
+ return acquired;
+ },
+ Some(Ok(per_cid)) => {
+ for (hash, cid) in chunk {
+ if let Some(FetchOutcome::Block(data)) = per_cid.get(cid) {
+ log::debug!(
+ target: LOG_TARGET,
+ "fetched {} bytes for {:?} from {peer:?}",
+ data.len(),
+ hash,
+ );
+ acquired.insert(*hash, data.clone());
+ }
+ }
+ },
+ }
+ }
+ acquired
+}
+
+async fn with_timeout(fut: F, timeout: Duration) -> Option
+where
+ F: std::future::Future