diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index eb9a128feb..194fa2b8fc 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -448,6 +448,7 @@ impl AllSync { all_forks, slab_insertion: self.shared.sources.vacant_entry(), warp_sync: &mut self.warp_sync, + best_block_number, marker: PhantomData, }) } @@ -456,6 +457,7 @@ impl AllSync { all_forks, slab_insertion: self.shared.sources.vacant_entry(), warp_sync: &mut self.warp_sync, + best_block_number, marker: PhantomData, }) } @@ -464,6 +466,7 @@ impl AllSync { all_forks, slab_insertion: self.shared.sources.vacant_entry(), warp_sync: &mut self.warp_sync, + best_block_number, marker: PhantomData, }) } @@ -472,6 +475,7 @@ impl AllSync { all_forks, slab_insertion: self.shared.sources.vacant_entry(), warp_sync: &mut self.warp_sync, + best_block_number, marker: PhantomData, }) } @@ -1520,6 +1524,7 @@ pub struct AddSourceOldBlock<'a, TRq, TSrc, TBl> { all_forks: all_forks::AddSourceOldBlock<'a, Option, AllForksRequestExtra, AllForksSourceExtra>, warp_sync: &'a mut Option>, + best_block_number: u64, marker: PhantomData, } @@ -1538,7 +1543,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceOldBlock<'a, TRq, TSrc, TBl> { .add_source(AllForksSourceExtra { outer_source_id }); let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync { - Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id })) + Some(warp_sync.add_source( + WarpSyncSourceExtra { outer_source_id }, + self.best_block_number, + )) } else { None }; @@ -1561,6 +1569,7 @@ pub struct AddSourceKnown<'a, TRq, TSrc, TBl> { all_forks: all_forks::AddSourceKnown<'a, Option, AllForksRequestExtra, AllForksSourceExtra>, warp_sync: &'a mut Option>, + best_block_number: u64, marker: PhantomData, } @@ -1587,7 +1596,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceKnown<'a, TRq, TSrc, TBl> { .add_source(AllForksSourceExtra { outer_source_id }); let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync { - Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id })) + Some(warp_sync.add_source( + WarpSyncSourceExtra { outer_source_id }, + self.best_block_number, + )) } else { None }; @@ -1610,6 +1622,7 @@ pub struct AddSourceUnknown<'a, TRq, TSrc, TBl> { all_forks: all_forks::AddSourceUnknown<'a, Option, AllForksRequestExtra, AllForksSourceExtra>, warp_sync: &'a mut Option>, + best_block_number: u64, marker: PhantomData, } @@ -1636,7 +1649,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceUnknown<'a, TRq, TSrc, TBl> { ); let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync { - Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id })) + Some(warp_sync.add_source( + WarpSyncSourceExtra { outer_source_id }, + self.best_block_number, + )) } else { None }; diff --git a/lib/src/sync/warp_sync.rs b/lib/src/sync/warp_sync.rs index 2dcea3b69b..3471fb56b4 100644 --- a/lib/src/sync/warp_sync.rs +++ b/lib/src/sync/warp_sync.rs @@ -250,7 +250,7 @@ pub fn start_warp_sync( download_all_chain_information_storage_proofs: config .download_all_chain_information_storage_proofs, sources: slab::Slab::with_capacity(config.sources_capacity), - sources_by_finalized_height: BTreeSet::new(), + sources_by_opt_finalized_height: BTreeSet::new(), in_progress_requests: slab::Slab::with_capacity(config.requests_capacity), in_progress_requests_by_source: BTreeSet::new(), warp_sync_fragments_download: None, @@ -353,9 +353,9 @@ pub struct WarpSync { download_all_chain_information_storage_proofs: bool, /// List of requests that have been added using [`WarpSync::add_source`]. sources: slab::Slab>, - /// Subset of the entries as [`WarpSync::sources`] whose [`Source::finalized_block_height`] - /// is `Ok`. Indexed by [`Source::finalized_block_height`]. - sources_by_finalized_height: BTreeSet<(u64, SourceId)>, + /// Subset of the entries as [`WarpSync::sources`] whose [`Source::optimistic_finalized_block_height`] + /// is `Ok`. Indexed by [`Source::optimistic_finalized_block_height`]. + sources_by_opt_finalized_height: BTreeSet<(u64, SourceId)>, /// List of requests that have been added using [`WarpSync::add_request`]. in_progress_requests: slab::Slab<(SourceId, TRq, RequestDetail)>, /// Identical to [`WarpSync::in_progress_requests`], but indexed differently. @@ -378,8 +378,8 @@ pub struct WarpSync { struct Source { /// User data chosen by the API user. user_data: TSrc, - /// Height of the finalized block of the source, as reported by the source. - finalized_block_height: u64, + /// Initialized from the source's best block adjusted as real GrandPa neighbor-packet arrives. + optimistic_finalized_block_height: u64, } /// SeeĀ [`WarpSync::warped_block_ty`]. @@ -640,17 +640,20 @@ impl WarpSync { /// Add a source to the list of sources. /// - /// The source has a finalized block height of 0, which should later be updated using - /// [`WarpSync::set_source_finality_state`]. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { + /// `best_block_number` is used as the initial optimistically finalized block height + /// for this source. + /// It can later be updated using [`WarpSync::set_source_finality_state`]. + pub fn add_source(&mut self, user_data: TSrc, best_block_number: u64) -> SourceId { let source_id = SourceId(self.sources.insert(Source { user_data, - finalized_block_height: 0, + optimistic_finalized_block_height: best_block_number, })); - let _inserted = self.sources_by_finalized_height.insert((0, source_id)); + let _inserted = self + .sources_by_opt_finalized_height + .insert((best_block_number, source_id)); debug_assert!(_inserted); - debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len()); + debug_assert!(self.sources.len() >= self.sources_by_opt_finalized_height.len()); source_id } @@ -670,10 +673,10 @@ impl WarpSync { debug_assert!(self.sources.contains(to_remove.0)); let removed = self.sources.remove(to_remove.0); let _was_in = self - .sources_by_finalized_height - .remove(&(removed.finalized_block_height, to_remove)); + .sources_by_opt_finalized_height + .remove(&(removed.optimistic_finalized_block_height, to_remove)); debug_assert!(_was_in); - debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len()); + debug_assert!(self.sources.len() >= self.sources_by_opt_finalized_height.len()); // We make sure to not leave invalid source IDs in the state of `self`. // TODO: O(n) @@ -749,48 +752,49 @@ impl WarpSync { (removed.user_data, obsolete_requests.into_iter()) } - /// Sets the finalized block height of the given source. + /// Sets the optimistic estimate of the source's finalized block height. /// /// # Panic /// /// Panics if `source_id` is invalid. - /// - pub fn set_source_finality_state(&mut self, source_id: SourceId, finalized_block_height: u64) { - let stored_height = &mut self.sources[source_id.0].finalized_block_height; + pub fn set_source_finality_state( + &mut self, + source_id: SourceId, + optimistic_finalized_block_height: u64, + ) { + let stored_height = &mut self.sources[source_id.0].optimistic_finalized_block_height; // Small optimization. No need to do anything more if the block doesn't actuall change. - if *stored_height == finalized_block_height { + if *stored_height == optimistic_finalized_block_height { return; } - // Note that if the new finalized block is below the former one (which is not something - // that is ever supposed to happen), we should in principle cancel the requests + // Note that if the new optimistically finalized block is below the former one (which is not + // something that is ever supposed to happen), we should in principle cancel the requests // targeting that source that require a specific block height. In practice, however, // we don't care as again this isn't supposed to ever happen. While ongoing requests // might fail as a result, this is handled the same way as a regular request failure. let _was_in = self - .sources_by_finalized_height + .sources_by_opt_finalized_height .remove(&(*stored_height, source_id)); debug_assert!(_was_in); let _inserted = self - .sources_by_finalized_height - .insert((finalized_block_height, source_id)); + .sources_by_opt_finalized_height + .insert((optimistic_finalized_block_height, source_id)); debug_assert!(_inserted); - *stored_height = finalized_block_height; + *stored_height = optimistic_finalized_block_height; } - /// Gets the finalized block height of the given source. - /// - /// Equal to 0 if [`WarpSync::set_source_finality_state`] hasn't been called. + /// Gets the optimistic finalized block height of the given source. /// /// # Panic /// /// Panics if `source_id` is invalid. /// pub fn source_finality_state(&self, source_id: SourceId) -> u64 { - self.sources[source_id.0].finalized_block_height + self.sources[source_id.0].optimistic_finalized_block_height } /// Returns a list of requests that should be started in order to drive the warp syncing @@ -841,7 +845,7 @@ impl WarpSync { if let Some(verify_queue_tail_block_number) = verify_queue_tail_block_number { // Combine the request with every single available source. either::Left(self.sources.iter().filter_map(move |(src_id, src)| { - if src.finalized_block_height + if src.optimistic_finalized_block_height <= verify_queue_tail_block_number.saturating_add( u64::try_from(warp_sync_minimum_gap).unwrap_or(u64::MAX), ) @@ -899,7 +903,7 @@ impl WarpSync { // Sources are ordered by increasing finalized block height, in order to // have the highest chance for the block to not be pruned. let sources_with_block = self - .sources_by_finalized_height + .sources_by_opt_finalized_height .range((self.warped_header_number, SourceId(usize::MIN))..) .map(|(_, src_id)| src_id); @@ -931,7 +935,7 @@ impl WarpSync { // Sources are ordered by increasing finalized block height, in order to // have the highest chance for the block to not be pruned. let sources_with_block = self - .sources_by_finalized_height + .sources_by_opt_finalized_height .range((self.warped_header_number, SourceId(usize::MIN))..) .map(|(_, src_id)| src_id); @@ -970,7 +974,7 @@ impl WarpSync { // Sources are ordered by increasing finalized block height, in order to // have the highest chance for the block to not be pruned. let sources_with_block = self - .sources_by_finalized_height + .sources_by_opt_finalized_height .range((self.warped_header_number, SourceId(usize::MIN))..) .map(|(_, src_id)| src_id); @@ -1039,7 +1043,8 @@ impl WarpSync { _, BodyDownload::NotStarted, ) => { - if self.sources[source_id.0].finalized_block_height >= self.warped_header_number + if self.sources[source_id.0].optimistic_finalized_block_height + >= self.warped_header_number && *block_number == self.warped_header_number && *block_hash == self.warped_header_hash { @@ -1064,7 +1069,8 @@ impl WarpSync { Cow::Borrowed(&b":code"[..]) }; - if self.sources[source_id.0].finalized_block_height >= self.warped_header_number + if self.sources[source_id.0].optimistic_finalized_block_height + >= self.warped_header_number && *block_hash == self.warped_header_hash && keys.iter().any(|k| *k == *code_key_to_request) && keys.iter().any(|k| k == b":heappages") @@ -1086,7 +1092,7 @@ impl WarpSync { ) => { for (info, status) in &mut self.runtime_calls { if matches!(status, CallProof::NotStarted) - && self.sources[source_id.0].finalized_block_height + && self.sources[source_id.0].optimistic_finalized_block_height >= self.warped_header_number && *block_hash == self.warped_header_hash && function_name == info.function_name() @@ -1336,7 +1342,8 @@ impl WarpSync { .last() .and_then(|h| header::decode(&h.scale_encoded_header, self.block_number_bytes).ok()) { - let src_finalized_height = &mut self.sources[rq_source_id.0].finalized_block_height; + let src_opt_finalized_height = + &mut self.sources[rq_source_id.0].optimistic_finalized_block_height; let new_height = if final_set_of_fragments { // If the source indicated that this is the last fragment, then we know that @@ -1346,20 +1353,23 @@ impl WarpSync { // If this is not the last fragment, we know that the finalized block of the // source is *at least* the one provided. // TODO: could maybe do + gap or something? - cmp::max(*src_finalized_height, last_header.number.saturating_add(1)) + cmp::max( + *src_opt_finalized_height, + last_header.number.saturating_add(1), + ) }; - if *src_finalized_height != new_height { + if *src_opt_finalized_height != new_height { let _was_in = self - .sources_by_finalized_height - .remove(&(*src_finalized_height, rq_source_id)); + .sources_by_opt_finalized_height + .remove(&(*src_opt_finalized_height, rq_source_id)); debug_assert!(_was_in); - *src_finalized_height = new_height; + *src_opt_finalized_height = new_height; let _inserted = self - .sources_by_finalized_height - .insert((*src_finalized_height, rq_source_id)); + .sources_by_opt_finalized_height + .insert((*src_opt_finalized_height, rq_source_id)); debug_assert!(_inserted); } } diff --git a/light-base/src/network_service/tasks.rs b/light-base/src/network_service/tasks.rs index a9934721ec..41ef9bda89 100644 --- a/light-base/src/network_service/tasks.rs +++ b/light-base/src/network_service/tasks.rs @@ -20,7 +20,7 @@ use crate::{ platform::{PlatformRef, SubstreamDirection}, }; -use alloc::{boxed::Box, string::String}; +use alloc::{boxed::Box, string::String, sync::Arc}; use core::{pin, time::Duration}; use futures_lite::FutureExt as _; use futures_util::{StreamExt as _, future, stream::FuturesUnordered}; @@ -251,6 +251,11 @@ pub(super) async fn webrtc_multi_stream_connection_task( let mut next_substream_id = 0; // We need to pin the receiver, as the type doesn't implement `Unpin`. let mut coordinator_to_connection = pin::pin!(coordinator_to_connection); + // Shared event used to wake up all substream wait futures when + // `inject_coordinator_message` queues write data (e.g. AcceptInNotifications). + // Without this, the substream would never be re-polled since the platform's + // `wait_read_write_again` only wakes on incoming data or timers. + let coordinator_write_ready = Arc::new(event_listener::Event::new()); loop { // Start opening new outbound substreams, if needed. @@ -347,6 +352,12 @@ pub(super) async fn webrtc_multi_stream_connection_task( match wake_up_reason { WakeUpReason::CoordinatorMessage(message) => { connection_task.inject_coordinator_message(&platform.now(), message); + // Wake up all substream wait futures so they get re-polled. + // This is necessary because inject_coordinator_message may have + // queued write data (e.g. AcceptInNotifications queues a handshake + // response) but the platform's wait_read_write_again only wakes on + // incoming data or timers -- not on pending writes. + coordinator_write_ready.notify(usize::MAX); } WakeUpReason::CoordinatorDead => { log!( @@ -453,8 +464,16 @@ pub(super) async fn webrtc_multi_stream_connection_task( if let SubstreamFate::Continue = substream_fate { when_substreams_rw_ready.push({ let platform = platform.clone(); + let write_ready = coordinator_write_ready.clone(); + let write_listener = write_ready.listen(); Box::pin(async move { - platform.wait_read_write_again(socket.as_mut()).await; + // Wait for either platform data or a coordinator write + // notification (e.g. after AcceptInNotifications queues a + // handshake response that needs to be flushed). + platform + .wait_read_write_again(socket.as_mut()) + .or(write_listener) + .await; (socket, substream_id) }) });