Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lib/src/sync/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
all_forks,
slab_insertion: self.shared.sources.vacant_entry(),
warp_sync: &mut self.warp_sync,
best_block_number,
marker: PhantomData,
})
}
Expand All @@ -456,6 +457,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
all_forks,
slab_insertion: self.shared.sources.vacant_entry(),
warp_sync: &mut self.warp_sync,
best_block_number,
marker: PhantomData,
})
}
Expand All @@ -464,6 +466,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
all_forks,
slab_insertion: self.shared.sources.vacant_entry(),
warp_sync: &mut self.warp_sync,
best_block_number,
marker: PhantomData,
})
}
Expand All @@ -472,6 +475,7 @@ impl<TRq, TSrc, TBl> AllSync<TRq, TSrc, TBl> {
all_forks,
slab_insertion: self.shared.sources.vacant_entry(),
warp_sync: &mut self.warp_sync,
best_block_number,
marker: PhantomData,
})
}
Expand Down Expand Up @@ -1520,6 +1524,7 @@ pub struct AddSourceOldBlock<'a, TRq, TSrc, TBl> {
all_forks:
all_forks::AddSourceOldBlock<'a, Option<TBl>, AllForksRequestExtra, AllForksSourceExtra>,
warp_sync: &'a mut Option<warp_sync::WarpSync<WarpSyncSourceExtra, WarpSyncRequestExtra>>,
best_block_number: u64,
marker: PhantomData<TRq>,
}

Expand All @@ -1538,7 +1543,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceOldBlock<'a, TRq, TSrc, TBl> {
.add_source(AllForksSourceExtra { outer_source_id });

let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync {
Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id }))
Some(warp_sync.add_source(
WarpSyncSourceExtra { outer_source_id },
self.best_block_number,
))
} else {
None
};
Expand All @@ -1561,6 +1569,7 @@ pub struct AddSourceKnown<'a, TRq, TSrc, TBl> {
all_forks:
all_forks::AddSourceKnown<'a, Option<TBl>, AllForksRequestExtra, AllForksSourceExtra>,
warp_sync: &'a mut Option<warp_sync::WarpSync<WarpSyncSourceExtra, WarpSyncRequestExtra>>,
best_block_number: u64,
marker: PhantomData<TRq>,
}

Expand All @@ -1587,7 +1596,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceKnown<'a, TRq, TSrc, TBl> {
.add_source(AllForksSourceExtra { outer_source_id });

let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync {
Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id }))
Some(warp_sync.add_source(
WarpSyncSourceExtra { outer_source_id },
self.best_block_number,
))
} else {
None
};
Expand All @@ -1610,6 +1622,7 @@ pub struct AddSourceUnknown<'a, TRq, TSrc, TBl> {
all_forks:
all_forks::AddSourceUnknown<'a, Option<TBl>, AllForksRequestExtra, AllForksSourceExtra>,
warp_sync: &'a mut Option<warp_sync::WarpSync<WarpSyncSourceExtra, WarpSyncRequestExtra>>,
best_block_number: u64,
marker: PhantomData<TRq>,
}

Expand All @@ -1636,7 +1649,10 @@ impl<'a, TRq, TSrc, TBl> AddSourceUnknown<'a, TRq, TSrc, TBl> {
);

let warp_sync_source_id = if let Some(warp_sync) = self.warp_sync {
Some(warp_sync.add_source(WarpSyncSourceExtra { outer_source_id }))
Some(warp_sync.add_source(
WarpSyncSourceExtra { outer_source_id },
self.best_block_number,
))
} else {
None
};
Expand Down
12 changes: 7 additions & 5 deletions lib/src/sync/warp_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,15 +640,17 @@ impl<TSrc, TRq> WarpSync<TSrc, TRq> {

/// Add a source to the list of sources.
///
/// The source has a finalized block height of 0, which should later be updated using
/// [`WarpSync::set_source_finality_state`].
pub fn add_source(&mut self, user_data: TSrc) -> SourceId {
/// `best_block_number` is used as the initial finalized block height for this source.
/// It can later be updated using [`WarpSync::set_source_finality_state`].
pub fn add_source(&mut self, user_data: TSrc, best_block_number: u64) -> SourceId {
let source_id = SourceId(self.sources.insert(Source {
user_data,
finalized_block_height: 0,
finalized_block_height: best_block_number,
}));

let _inserted = self.sources_by_finalized_height.insert((0, source_id));
let _inserted = self
.sources_by_finalized_height
.insert((best_block_number, source_id));
debug_assert!(_inserted);
debug_assert!(self.sources.len() >= self.sources_by_finalized_height.len());

Expand Down
23 changes: 21 additions & 2 deletions light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
platform::{PlatformRef, SubstreamDirection},
};

use alloc::{boxed::Box, string::String};
use alloc::{boxed::Box, string::String, sync::Arc};
use core::{pin, time::Duration};
use futures_lite::FutureExt as _;
use futures_util::{StreamExt as _, future, stream::FuturesUnordered};
Expand Down Expand Up @@ -251,6 +251,11 @@ pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>(
let mut next_substream_id = 0;
// We need to pin the receiver, as the type doesn't implement `Unpin`.
let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
// Shared event used to wake up all substream wait futures when
// `inject_coordinator_message` queues write data (e.g. AcceptInNotifications).
// Without this, the substream would never be re-polled since the platform's
// `wait_read_write_again` only wakes on incoming data or timers.
let coordinator_write_ready = Arc::new(event_listener::Event::new());

loop {
// Start opening new outbound substreams, if needed.
Expand Down Expand Up @@ -347,6 +352,12 @@ pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>(
match wake_up_reason {
WakeUpReason::CoordinatorMessage(message) => {
connection_task.inject_coordinator_message(&platform.now(), message);
// Wake up all substream wait futures so they get re-polled.
// This is necessary because inject_coordinator_message may have
// queued write data (e.g. AcceptInNotifications queues a handshake
// response) but the platform's wait_read_write_again only wakes on
// incoming data or timers -- not on pending writes.
coordinator_write_ready.notify(usize::MAX);
}
WakeUpReason::CoordinatorDead => {
log!(
Expand Down Expand Up @@ -453,8 +464,16 @@ pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>(
if let SubstreamFate::Continue = substream_fate {
when_substreams_rw_ready.push({
let platform = platform.clone();
let write_ready = coordinator_write_ready.clone();
Box::pin(async move {
platform.wait_read_write_again(socket.as_mut()).await;
// Wait for either platform data or a coordinator write
// notification (e.g. after AcceptInNotifications queues a
// handshake response that needs to be flushed).
let write_listener = write_ready.listen();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the write_ready.listen() outside the Box::pin future?

Currently, the write listener is registered once the the when_substreams_rw_ready is firstly polled.
This means we could arrive in a situation where .notify(usize::MAX); is called before registering the listener call.

For this to happen we need to:

  • receive WakeUpReason::CoordinatorMessage(message)
  • message_sending.as_ref().as_pin_ref().is_none() should have some data to effectively take the None path and ignore the when_substreams_rw_ready.select_next_some()

Moving the listener above Box::pin solves this regardless of which states call when_substreams_rw_ready.poll_next and avoids the problem entirely

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are totally right, it should definitely be moved out so that it is registered and the async block later on depends on the registered event listener!

platform
.wait_read_write_again(socket.as_mut())
.or(write_listener)
.await;
(socket, substream_id)
})
});
Expand Down
Loading