Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d59cb4c
webrtc: Fix InputRejected error during ICE negotiation
timwu20 Feb 4, 2026
086ac4d
webrtc: Add diagnostic logging for notification handshake debugging
timwu20 Feb 8, 2026
624fccb
webrtc: report substream open failure when channel closes unexpectedly
timwu20 Feb 26, 2026
1dfbe6e
chore(webrtc): update trace target and fin_ack timeout
gab8i May 15, 2026
2be6862
fix/refactor(webrtc): re-shape shutdown procedure
gab8i May 18, 2026
b3f7ed0
fix(webrtc): immediately reply with FIN_ACK after receiving FIN
gab8i May 20, 2026
18a9173
fix(webrtc): handle higher level drop SubstreamHandle
gab8i May 21, 2026
2908e57
fix(webrtc): don't downgrade Reset to SendClosed on StopSending
gab8i May 21, 2026
93e48b9
fix(webrtc): do not panic within `make_rtc_client`
gab8i May 21, 2026
1f1d229
fix(webrtc): close connection, not panic and report pending open fail…
gab8i May 21, 2026
9166876
fix(webrtc): process flags even when `Substream` is dropped
gab8i May 21, 2026
04dbf48
fix(webrtc): drive pending shutdown
gab8i May 21, 2026
90aa132
refactor(webrtc): rework Substream <-> SubstreamHandle communication …
gab8i May 28, 2026
64a5c4b
Merge branch 'master' into gab_webrtc_multiple_fixes_v2
gab8i May 28, 2026
475cb1b
test(webrtc): clippy
gab8i May 28, 2026
1b44a5f
fix(webrtc): avoid duration underflow
gab8i May 29, 2026
e11314f
refactor(webrtc): remove redundand writer_state check
gab8i May 29, 2026
5b25dd9
chore(webrtc): remove ice pass from debug trace
gab8i May 29, 2026
46259b1
fix(webrtc): rtc channel config, ordered and reliable messages
gab8i May 29, 2026
0494c1d
fix(webrtc): remove useless rct.handle_intput with timeout call
gab8i May 29, 2026
9db4a8e
refactor(webrtc): lock free shared state
gab8i May 29, 2026
f2f3259
Merge branch 'master' into gab_webrtc_multiple_fixes_v2
gab8i Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/protocol/notification/negotiation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum HandshakeEvent {
}

/// Outbound substream's handshake state
#[derive(Debug)]
enum HandshakeState {
/// Send handshake to remote peer.
SendHandshake,
Expand Down Expand Up @@ -218,6 +219,13 @@ impl Stream for HandshakeService {
inner.substreams.iter_mut()
{
if let Poll::Ready(()) = timer.poll_unpin(cx) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?direction,
?state,
"handshake negotiation timed out",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
Expand Down Expand Up @@ -285,10 +293,36 @@ impl Stream for HandshakeService {
},
HandshakeState::ReadHandshake => match pinned.poll_next(cx) {
Poll::Ready(Some(Ok(handshake))) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
handshake_len = handshake.len(),
"successfully read handshake from substream",
);
inner.ready.push_back((*peer, *direction, handshake.freeze().into()));
continue 'outer;
}
Poll::Ready(Some(Err(_))) | Poll::Ready(None) => {
Poll::Ready(Some(Err(error))) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?error,
"error reading handshake from substream",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
peer: *peer,
direction: *direction,
},
)));
}
Poll::Ready(None) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
"substream closed while reading handshake",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
Expand Down
148 changes: 130 additions & 18 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
transport::{
webrtc::{
schema::webrtc::message::Flag,
substream::{Event as SubstreamEvent, Substream as WebRtcSubstream, SubstreamHandle},
substream::{Message, Substream as WebRtcSubstream, SubstreamHandle},
util::{extract_framed_message, WebRtcMessage},
},
Endpoint,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl SubstreamHandleSet {
}

impl Stream for SubstreamHandleSet {
type Item = (ChannelId, Option<SubstreamEvent>);
type Item = (ChannelId, Option<Message>);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let len = match self.handles.len() {
Expand Down Expand Up @@ -465,8 +465,50 @@ impl WebRtcConnection {
"channel closed",
);

self.pending_outbound.remove(&channel_id);
self.channels.remove(&channel_id);
// If this was a pending outbound channel (waiting for DCEP ACK from remote),
// report the failure so the protocol handler can retry.
if let Some(context) = self.pending_outbound.remove(&channel_id) {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
protocol = %context.protocol,
substream_id = ?context.substream_id,
"outbound channel closed before opening, reporting failure",
);

let _ = self
.protocol_set
.report_substream_open_failure(
context.protocol,
context.substream_id,
SubstreamError::ConnectionClosed,
)
.await;
}

if let Some(ChannelState::OutboundOpening { context, .. }) =
self.channels.remove(&channel_id)
{
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
protocol = %context.protocol,
substream_id = ?context.substream_id,
"outbound channel closed during negotiation, reporting failure",
);

let _ = self
.protocol_set
.report_substream_open_failure(
context.protocol,
context.substream_id,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a different substream ID from the one at pending_outbound?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is stored within pending_outbound differs from what is stored within the channels field. A channel is initially opened and ends up in pending_outbound with a substream_id, once the WebRTC data channel is created, it is moved into the channels field, where the state OutboundOpening implies that the multistream-select protocol still needs to happen. These are the same channel in two different phases.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we here call report_substream_open_failure twice from the higher-level protocols perspective?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so?

on_open_substream -> inserts pending_outbound item
on_channel_opened -> remove item from pending_outbound and inserts it as ChannelState::OutboundOpening within the channels field

While report_substream_open_failure is getting called only while negotiating the multistream-select protocol and while a channel is getting closed, within neither of the two it seems to be the case of it being called twice over the same substream id

SubstreamError::ConnectionClosed,
)
.await;
}

self.pending_messages.remove(&channel_id);
self.handles.remove(&channel_id);
self.recv_buffers.remove(&channel_id);
Expand Down Expand Up @@ -501,6 +543,7 @@ impl WebRtcConnection {
"handle opening inbound substream",
);

// Decode errors are not recoverable.
let payload = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?;
let protocols = self.protocol_set.protocols_with_keep_alives();
let protocol_names = protocols.keys().cloned().collect();
Expand Down Expand Up @@ -889,6 +932,7 @@ impl WebRtcConnection {

self.rtc.direct_api().close_data_channel(channel_id);
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to advance state: Arc<Mutex<State>> into Reset here?

Otherwise the poll_shutdown would always return Poll::Pending:

        if matches!(*self.state.lock(), State::FinAcked | State::Reset) {
            Poll::Ready(Ok(()))
        } else {
            Poll::Pending
        }

And this could stall further substream.close().await calls indefinetely

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the easiest fix would be to set the state inside SubstreamHandle::drop

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, tbf I honestly thought that poll_shutdown would have been stop to be called automatically if Substream was drop, but now that I think about it, it is more plausible it is the other way around: the Substream doesn't get drop until the poll_shutdown doesn't complete.

As you said, there could be an additional flag, something like ForceStop which just implies that something higher in the stack is forcing this substream to close.

}
},
ChannelState::Closing => {
Expand Down Expand Up @@ -972,6 +1016,29 @@ impl WebRtcConnection {
"connection closed",
);

let mut report_failure = async |context: &ChannelContext| {
let _ = self
.protocol_set
.report_substream_open_failure(
context.protocol.clone(),
context.substream_id,
SubstreamError::ConnectionClosed,
)
.await;
};

// Drain pending outbound opens (data channel not yet acked).
for (_, context) in self.pending_outbound.drain() {
report_failure(&context).await;
}

// Drain channels still in OutboundOpening (multistream-select in flight).
for (_, state) in self.channels.drain() {
if let ChannelState::OutboundOpening { context, .. } = state {
report_failure(&context).await;
}
}

let _ = self
.protocol_set
.report_connection_closed(self.peer, self.endpoint.connection_id())
Expand Down Expand Up @@ -1004,7 +1071,26 @@ impl WebRtcConnection {
"transmit data",
);

self.socket.try_send_to(&v.contents, v.destination).unwrap();
if let Err(error) = self.socket.try_send_to(&v.contents, v.destination) {
if error.kind() == std::io::ErrorKind::WouldBlock {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
destination = ?v.destination,
"UDP send buffer full, dropping datagram (str0m will retransmit)",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it, str0m will retransmit after the hole in the sequence numbers is detected, leading to unordered delivery of packets and broken application data. Instead it's better to pause (backpressure) a producer and send all UDP packets in order they are produced by str0m.

Can be a follow-up PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that what you described is really needed.
The flow of messages producer -> str0m is different from str0m -> udp socket.
The latter benefits from all the guarantees that str0m gives, especially given how str0m is created:

  • ordered: false
  • reliability: Default::default() → Reliability::Reliable

So the channel is reliable: dropping an outgoing UDP datagram cannot lose application data, since SCTP guarantees retransmission.

The producer backpressure implemented by #575 was related to the first flow, messages that have not yet entered str0m, where it is up to us to control the order of things.

If we shift the focus from correctness to efficiency, then yes, it would be better to have a queue of pending UDP packets, because relying on str0m to recover here could be pretty slow.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading now #586 (comment) I think I got confused about how the 'ordered' flag works within str0m

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropping an outgoing UDP datagram cannot lose application data

It doesn't "lose" it in the precise sense, but because the packets arrive not in order now, and we do not reorder them on the application protocol level, the actual stream data will be garbage.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't know why libp2p spec is using unordered delivery. This is not what we get with TCP/WSS connections.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if I'm not mistaken, to follow golibp2p behavior, ordered should be set to true (which is also the default one) so that the peer is expected to hold upon retransmission of lost packet!

This would imply that the current implementation is expected to work fine! Beside a possible optimization by manually re-transmitting the packet instead of waiting for str0m detecting it!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the last message was sent without reading your replies, gh didn't show them up to now.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let's set ordered: true. The spec says the implementations MAY expose unordered, so we are free to use ordered as well:
https://github.com/libp2p/specs/blob/master/webrtc/README.md#ordering

);
} else {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
destination = ?v.destination,
?error,
"failed to send datagram, closing connection",
);
return self.on_connection_closed().await;
}
}

continue;
}
Output::Event(v) => match v {
Expand Down Expand Up @@ -1076,27 +1162,44 @@ impl WebRtcConnection {
},
};

let duration = timeout - Instant::now();
if duration.is_zero() {
self.rtc.handle_input(Input::Timeout(Instant::now())).unwrap();
continue;
}

tokio::select! {
biased;
datagram = self.dgram_rx.recv() => match datagram {
Some(datagram) => {
let contents = match datagram.as_slice().try_into() {
Ok(contents) => contents,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?error,
datagram_len = datagram.len(),
"failed to parse inbound datagram, closing connection",
);

return self.on_connection_closed().await;
}
};

let input = Input::Receive(
Instant::now(),
Receive {
proto: Str0mProtocol::Udp,
source: self.peer_address,
destination: self.local_address,
contents: datagram.as_slice().try_into().unwrap(),
contents,
},
);

self.rtc.handle_input(input).unwrap();
if let Err(error) = self.rtc.handle_input(input) {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?error,
"str0m rejected inbound datagram, closing connection",
);
return self.on_connection_closed().await;
}
}
None => {
tracing::trace!(
Expand All @@ -1121,7 +1224,7 @@ impl WebRtcConnection {
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
}
Some((channel_id, Some(SubstreamEvent::Message { payload, flag }))) => {
Some((channel_id, Some(Message { payload, flag }))) => {
if let Err(error) = self.on_outbound_data(channel_id, payload, flag) {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -1131,11 +1234,11 @@ impl WebRtcConnection {
"failed to send data to remote peer",
);

self.channels.insert(channel_id, ChannelState::Closing);
self.rtc.direct_api().close_data_channel(channel_id);
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
}
}
Some((_, Some(SubstreamEvent::RecvClosed))) => {}
},
command = self.protocol_set.next() => match command {
None | Some(ProtocolCommand::ForceClose) => {
Expand Down Expand Up @@ -1178,8 +1281,17 @@ impl WebRtcConnection {
);
}
},
_ = tokio::time::sleep(duration) => {
self.rtc.handle_input(Input::Timeout(Instant::now())).unwrap();
_ = tokio::time::sleep(timeout.saturating_duration_since(Instant::now())) => {
if let Err(error) = self.rtc.handle_input(Input::Timeout(Instant::now())) {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?error,
"str0m rejected timeout input, closing connection",
);

return self.on_connection_closed().await;
}
}
}
}
Expand Down
Loading
Loading