-
Notifications
You must be signed in to change notification settings - Fork 35
fix(webrtx): ICE issue + substream shutdown procedure #586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
d59cb4c
086ac4d
624fccb
1dfbe6e
2be6862
b3f7ed0
18a9173
2908e57
93e48b9
1f1d229
9166876
04dbf48
90aa132
64a5c4b
475cb1b
1b44a5f
e11314f
5b25dd9
46259b1
0494c1d
9db4a8e
f2f3259
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ use crate::{ | |
| transport::{ | ||
| webrtc::{ | ||
| schema::webrtc::message::Flag, | ||
| substream::{Event as SubstreamEvent, Substream as WebRtcSubstream, SubstreamHandle}, | ||
| substream::{Message, Substream as WebRtcSubstream, SubstreamHandle}, | ||
| util::WebRtcMessage, | ||
| }, | ||
| Endpoint, | ||
|
|
@@ -144,7 +144,7 @@ impl SubstreamHandleSet { | |
| } | ||
|
|
||
| impl Stream for SubstreamHandleSet { | ||
| type Item = (ChannelId, Option<SubstreamEvent>); | ||
| type Item = (ChannelId, Option<Message>); | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
| let len = match self.handles.len() { | ||
|
|
@@ -451,8 +451,50 @@ impl WebRtcConnection { | |
| "channel closed", | ||
| ); | ||
|
|
||
| self.pending_outbound.remove(&channel_id); | ||
| self.channels.remove(&channel_id); | ||
| // If this was a pending outbound channel (waiting for DCEP ACK from remote), | ||
| // report the failure so the protocol handler can retry. | ||
| if let Some(context) = self.pending_outbound.remove(&channel_id) { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?channel_id, | ||
| protocol = %context.protocol, | ||
| substream_id = ?context.substream_id, | ||
| "outbound channel closed before opening, reporting failure", | ||
| ); | ||
|
|
||
| let _ = self | ||
| .protocol_set | ||
| .report_substream_open_failure( | ||
| context.protocol, | ||
| context.substream_id, | ||
| SubstreamError::ConnectionClosed, | ||
| ) | ||
| .await; | ||
| } | ||
|
|
||
| if let Some(ChannelState::OutboundOpening { context, .. }) = | ||
| self.channels.remove(&channel_id) | ||
| { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?channel_id, | ||
| protocol = %context.protocol, | ||
| substream_id = ?context.substream_id, | ||
| "outbound channel closed during negotiation, reporting failure", | ||
| ); | ||
|
|
||
| let _ = self | ||
| .protocol_set | ||
| .report_substream_open_failure( | ||
| context.protocol, | ||
| context.substream_id, | ||
| SubstreamError::ConnectionClosed, | ||
| ) | ||
| .await; | ||
| } | ||
|
|
||
| self.pending_messages.remove(&channel_id); | ||
| self.handles.remove(&channel_id); | ||
|
|
||
|
|
@@ -486,6 +528,7 @@ impl WebRtcConnection { | |
| "handle opening inbound substream", | ||
| ); | ||
|
|
||
| // Decode errors are not recoverable. | ||
| let payload = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?; | ||
| let protocols = self.protocol_set.protocols_with_keep_alives(); | ||
| let protocol_names = protocols.keys().cloned().collect(); | ||
|
|
@@ -685,6 +728,7 @@ impl WebRtcConnection { | |
| channel_id: ChannelId, | ||
| data: Vec<u8>, | ||
| ) -> crate::Result<()> { | ||
| // Decode errors are not recoverable. | ||
| let message = WebRtcMessage::decode(&data)?; | ||
|
|
||
| tracing::debug!( | ||
|
|
@@ -841,6 +885,7 @@ impl WebRtcConnection { | |
|
|
||
| self.rtc.direct_api().close_data_channel(channel_id); | ||
| self.channels.insert(channel_id, ChannelState::Closing); | ||
| self.handles.remove(&channel_id); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to advance Otherwise the if matches!(*self.state.lock(), State::FinAcked | State::Reset) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}And this could stall further
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe the easiest fix would be to set the state inside
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yes, tbf I honestly thought that poll_shutdown would have been stop to be called automatically if As you said, there could be an additional flag, something like |
||
| } | ||
| }, | ||
| ChannelState::Closing => { | ||
|
|
@@ -924,6 +969,29 @@ impl WebRtcConnection { | |
| "connection closed", | ||
| ); | ||
|
|
||
| let mut report_failure = async |context: &ChannelContext| { | ||
| let _ = self | ||
| .protocol_set | ||
| .report_substream_open_failure( | ||
| context.protocol.clone(), | ||
| context.substream_id, | ||
| SubstreamError::ConnectionClosed, | ||
| ) | ||
| .await; | ||
| }; | ||
|
|
||
| // Drain pending outbound opens (data channel not yet acked). | ||
| for (_, context) in self.pending_outbound.drain() { | ||
| report_failure(&context).await; | ||
| } | ||
|
|
||
| // Drain channels still in OutboundOpening (multistream-select in flight). | ||
| for (_, state) in self.channels.drain() { | ||
| if let ChannelState::OutboundOpening { context, .. } = state { | ||
| report_failure(&context).await; | ||
| } | ||
| } | ||
|
|
||
| let _ = self | ||
| .protocol_set | ||
| .report_connection_closed(self.peer, self.endpoint.connection_id()) | ||
|
|
@@ -956,7 +1024,26 @@ impl WebRtcConnection { | |
| "transmit data", | ||
| ); | ||
|
|
||
| self.socket.try_send_to(&v.contents, v.destination).unwrap(); | ||
| if let Err(error) = self.socket.try_send_to(&v.contents, v.destination) { | ||
| if error.kind() == std::io::ErrorKind::WouldBlock { | ||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| destination = ?v.destination, | ||
| "UDP send buffer full, dropping datagram (str0m will retransmit)", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I see it, Can be a follow-up PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that what you described is really needed.
So the channel is reliable: dropping an outgoing UDP datagram cannot lose application data, since SCTP guarantees retransmission. The producer backpressure implemented by #575 was related to the first flow, messages that have not yet entered str0m, where it is up to us to control the order of things. If we shift the focus from correctness to efficiency, then yes, it would be better to have a queue of pending UDP packets, because relying on str0m to recover here could be pretty slow.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading now #586 (comment) I think I got confused about how the 'ordered' flag works within str0m
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It doesn't "lose" it in the precise sense, but because the packets arrive not in order now, and we do not reorder them on the application protocol level, the actual stream data will be garbage.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I don't know why libp2p spec is using unordered delivery. This is not what we get with TCP/WSS connections.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, if I'm not mistaken, to follow golibp2p behavior, ordered should be set to true (which is also the default one) so that the peer is expected to hold upon retransmission of lost packet! This would imply that the current implementation is expected to work fine! Beside a possible optimization by manually re-transmitting the packet instead of waiting for str0m detecting it!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, the last message was sent without reading your replies, gh didn't show them up to now.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, let's set |
||
| ); | ||
| } else { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| destination = ?v.destination, | ||
| ?error, | ||
| "failed to send datagram, closing connection", | ||
| ); | ||
| return self.on_connection_closed().await; | ||
| } | ||
| } | ||
|
|
||
| continue; | ||
| } | ||
| Output::Event(v) => match v { | ||
|
|
@@ -1030,25 +1117,56 @@ impl WebRtcConnection { | |
|
|
||
| let duration = timeout - Instant::now(); | ||
|
gab8i marked this conversation as resolved.
Outdated
|
||
| if duration.is_zero() { | ||
| self.rtc.handle_input(Input::Timeout(Instant::now())).unwrap(); | ||
| if let Err(error) = self.rtc.handle_input(Input::Timeout(Instant::now())) { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?error, | ||
| "str0m rejected timeout input, closing connection", | ||
| ); | ||
| return self.on_connection_closed().await; | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading the str0m docs, I don't think we need to pass Given the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are totally right, doc: What does this mean is that we can safely remove this special case. I just don't follow why we should move
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would prioritize pumping the network traffic over processing the commands. This can be triggered under load when multiple futures resolve at the same time, and in this case we want to free/process the network buffers first before processing the user commands.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current approach won't break anything, but we can try to process user commands only to discover we are backpressured by networking, then process networking, and only then process commands. With prioritizing networking we spare one poll cycle in such situation. |
||
| continue; | ||
| } | ||
|
|
||
| tokio::select! { | ||
| biased; | ||
| datagram = self.dgram_rx.recv() => match datagram { | ||
| Some(datagram) => { | ||
| let contents = match datagram.as_slice().try_into() { | ||
| Ok(contents) => contents, | ||
| Err(error) => { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?error, | ||
| datagram_len = datagram.len(), | ||
| "failed to parse inbound datagram, closing connection", | ||
| ); | ||
|
|
||
| return self.on_connection_closed().await; | ||
| } | ||
| }; | ||
|
|
||
| let input = Input::Receive( | ||
| Instant::now(), | ||
| Receive { | ||
| proto: Str0mProtocol::Udp, | ||
| source: self.peer_address, | ||
| destination: self.local_address, | ||
| contents: datagram.as_slice().try_into().unwrap(), | ||
| contents, | ||
| }, | ||
| ); | ||
|
|
||
| self.rtc.handle_input(input).unwrap(); | ||
| if let Err(error) = self.rtc.handle_input(input) { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?error, | ||
| "str0m rejected inbound datagram, closing connection", | ||
| ); | ||
| return self.on_connection_closed().await; | ||
| } | ||
| } | ||
| None => { | ||
| tracing::trace!( | ||
|
|
@@ -1073,7 +1191,7 @@ impl WebRtcConnection { | |
| self.channels.insert(channel_id, ChannelState::Closing); | ||
| self.handles.remove(&channel_id); | ||
| } | ||
| Some((channel_id, Some(SubstreamEvent::Message { payload, flag }))) => { | ||
| Some((channel_id, Some(Message { payload, flag }))) => { | ||
| if let Err(error) = self.on_outbound_data(channel_id, payload, flag) { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
|
|
@@ -1083,11 +1201,11 @@ impl WebRtcConnection { | |
| "failed to send data to remote peer", | ||
| ); | ||
|
|
||
| self.channels.insert(channel_id, ChannelState::Closing); | ||
| self.rtc.direct_api().close_data_channel(channel_id); | ||
| self.channels.insert(channel_id, ChannelState::Closing); | ||
| self.handles.remove(&channel_id); | ||
| } | ||
| } | ||
| Some((_, Some(SubstreamEvent::RecvClosed))) => {} | ||
| }, | ||
| command = self.protocol_set.next() => match command { | ||
| None | Some(ProtocolCommand::ForceClose) => { | ||
|
|
@@ -1131,7 +1249,16 @@ impl WebRtcConnection { | |
| } | ||
| }, | ||
| _ = tokio::time::sleep(duration) => { | ||
| self.rtc.handle_input(Input::Timeout(Instant::now())).unwrap(); | ||
| if let Err(error) = self.rtc.handle_input(Input::Timeout(Instant::now())) { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| peer = ?self.peer, | ||
| ?error, | ||
| "str0m rejected timeout input, closing connection", | ||
| ); | ||
|
|
||
| return self.on_connection_closed().await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a different substream ID from the one at
pending_outbound?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is stored within pending_outbound differs from what is stored within the channels field. A channel is initially opened and ends up in pending_outbound with a substream_id, once the WebRTC data channel is created, it is moved into the channels field, where the state OutboundOpening implies that the multistream-select protocol still needs to happen. These are the same channel in two different phases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we here call
report_substream_open_failuretwice from the higher-level protocols perspective?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why so?
on_open_substream-> insertspending_outbounditemon_channel_opened-> remove item frompending_outboundand inserts it asChannelState::OutboundOpeningwithin thechannelsfieldWhile
report_substream_open_failureis getting called only while negotiating the multistream-select protocol and while a channel is getting closed, within neither of the two it seems to be the case of it being called twice over the same substream id