Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
800 changes: 415 additions & 385 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ rcgen = { version = "0.14.5", optional = true }
# End of Quic related dependencies.

# WebRTC related dependencies. WebRTC is an experimental feature flag. The dependencies must be updated.
str0m = { version = "0.11.1", optional = true }
str0m = { git = "https://github.com/algesten/str0m", rev = "c1e24a5cf408426bd37f1a556dd2c40db679710a", optional = true }
# End of WebRTC related dependencies.

# Fuzzing related dependencies.
Expand Down
36 changes: 35 additions & 1 deletion src/protocol/notification/negotiation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub enum HandshakeEvent {
}

/// Outbound substream's handshake state
#[derive(Debug)]
enum HandshakeState {
/// Send handshake to remote peer.
SendHandshake,
Expand Down Expand Up @@ -218,6 +219,13 @@ impl Stream for HandshakeService {
inner.substreams.iter_mut()
{
if let Poll::Ready(()) = timer.poll_unpin(cx) {
tracing::trace!(
target: LOG_TARGET,
?peer,
?direction,
?state,
"handshake negotiation timed out",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
Expand Down Expand Up @@ -285,10 +293,36 @@ impl Stream for HandshakeService {
},
HandshakeState::ReadHandshake => match pinned.poll_next(cx) {
Poll::Ready(Some(Ok(handshake))) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
handshake_len = handshake.len(),
"successfully read handshake from substream",
);
inner.ready.push_back((*peer, *direction, handshake.freeze().into()));
continue 'outer;
}
Poll::Ready(Some(Err(_))) | Poll::Ready(None) => {
Poll::Ready(Some(Err(error))) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?error,
"error reading handshake from substream",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
peer: *peer,
direction: *direction,
},
)));
}
Poll::Ready(None) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
"substream closed while reading handshake",
);
return Poll::Ready(Some((
*peer,
HandshakeEvent::NegotiationError {
Expand Down
9 changes: 6 additions & 3 deletions src/protocol/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,12 @@ impl RequestResponseProtocol {
?request_id,
"timed out while sending response",
),
Ok(Ok(_)) => feedback.take().map_or((), |feedback| {
let _ = feedback.send(());
}),
Ok(Ok(_)) => {
let _ = substream.close().await;
if let Some(feedback) = feedback.take() {
let _ = feedback.send(());
}
}
Ok(Err(error)) => tracing::trace!(
target: LOG_TARGET,
?peer,
Expand Down
177 changes: 160 additions & 17 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,50 @@ impl WebRtcConnection {
"channel closed",
);

self.pending_outbound.remove(&channel_id);
self.channels.remove(&channel_id);
// If this was a pending outbound channel (waiting for DCEP ACK from remote),
// report the failure so the protocol handler can retry.
if let Some(context) = self.pending_outbound.remove(&channel_id) {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
protocol = %context.protocol,
substream_id = ?context.substream_id,
"outbound channel closed before opening, reporting failure",
);

let _ = self
.protocol_set
.report_substream_open_failure(
context.protocol,
context.substream_id,
SubstreamError::ConnectionClosed,
)
.await;
}

if let Some(ChannelState::OutboundOpening { context, .. }) =
self.channels.remove(&channel_id)
{
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
protocol = %context.protocol,
substream_id = ?context.substream_id,
"outbound channel closed during negotiation, reporting failure",
);

let _ = self
.protocol_set
.report_substream_open_failure(
context.protocol,
context.substream_id,
SubstreamError::ConnectionClosed,
)
.await;
}

self.handles.remove(&channel_id);

Ok(())
Expand Down Expand Up @@ -341,7 +383,47 @@ impl WebRtcConnection {
"handle opening inbound substream",
);

let payload = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?;
let rtc_message = WebRtcMessage::decode(&data)?;

// During negotiation the remote may send protobuf messages that carry only a flag
// (e.g. FIN, RESET_STREAM) with no multistream-select payload, or an empty payload
// alongside a flag. Treat close-related flags as fatal and skip empty payloads so
// the channel stays open for the next protocol proposal.
match (&rtc_message.flag, &rtc_message.payload) {
(Some(Flag::ResetStream | Flag::StopSending), _) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
flag = ?rtc_message.flag,
"remote sent close flag during inbound negotiation",
);
return Err(Error::InvalidState);
}
(_, None) => {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
flag = ?rtc_message.flag,
"ignoring empty payload during inbound negotiation, waiting for next message",
);
return Ok(None);
}
(_, Some(p)) if p.is_empty() => {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
flag = ?rtc_message.flag,
"ignoring empty payload during inbound negotiation, waiting for next message",
);
return Ok(None);
}
_ => {}
}

let payload = rtc_message.payload.expect("non-empty payload checked above");
let protocols = self.protocol_set.protocols_with_keep_alives();
let protocol_names = protocols.keys().cloned().collect();
let (response, negotiated) =
Expand Down Expand Up @@ -717,12 +799,17 @@ impl WebRtcConnection {
}

/// Handle outbound data with optional flag.
///
/// Returns `Ok(true)` if backpressure should be applied (buffer too full),
/// `Ok(false)` if write succeeded, or `Err` on failure.
fn on_outbound_data(
&mut self,
channel_id: ChannelId,
data: Vec<u8>,
flag: Option<Flag>,
) -> crate::Result<()> {
) -> crate::Result<bool> {
let mut channel = self.rtc.channel(channel_id).ok_or(Error::ChannelDoesntExist)?;

tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
Expand All @@ -732,12 +819,56 @@ impl WebRtcConnection {
"send data",
);

self.rtc
.channel(channel_id)
.ok_or(Error::ChannelDoesntExist)?
let accepted = channel
.write(true, WebRtcMessage::encode(data, flag).as_ref())
.map_err(Error::WebRtc)
.map(|_| ())
.map_err(Error::WebRtc)?;

if !accepted {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"backpressure applied, str0m write buffer full",
);
}

Ok(!accepted)
}

/// Retry pending writes on all backpressured channels.
///
/// Called after `Output::Transmit` when data has left the global buffer,
/// giving previously rejected writes a chance to succeed.
fn retry_pending_writes(&mut self) {
let channel_ids: Vec<_> = self.handles.handles.keys().copied().collect();

for channel_id in channel_ids {
let pending = self.handles.get_mut(&channel_id).and_then(|h| h.take_pending());
let Some(SubstreamEvent::Message { payload, flag }) = pending else {
continue;
};

match self.on_outbound_data(channel_id, payload.clone(), flag) {
Ok(false) =>
if let Some(handle) = self.handles.get_mut(&channel_id) {
handle.set_backpressure(false);
},
Ok(true) =>
if let Some(handle) = self.handles.get_mut(&channel_id) {
handle.set_backpressure(true);
handle.queue_pending(SubstreamEvent::Message { payload, flag });
},
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
?error,
"failed to retry pending write",
);
}
}
}
}

/// Open outbound substream.
Expand Down Expand Up @@ -820,6 +951,7 @@ impl WebRtcConnection {
);

self.socket.try_send_to(&v.contents, v.destination).unwrap();
self.retry_pending_writes();
continue;
}
Output::Event(v) => match v {
Expand Down Expand Up @@ -870,6 +1002,7 @@ impl WebRtcConnection {

continue;
}
Event::ChannelBufferedAmountLow(_) => continue,
event => {
tracing::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -928,14 +1061,24 @@ impl WebRtcConnection {
self.handles.remove(&channel_id);
}
Some((channel_id, Some(SubstreamEvent::Message { payload, flag }))) => {
if let Err(error) = self.on_outbound_data(channel_id, payload, flag) {
tracing::debug!(
target: LOG_TARGET,
?channel_id,
?flag,
?error,
"failed to send data to remote peer",
);
match self.on_outbound_data(channel_id, payload.clone(), flag) {
Ok(false) => {} // Write succeeded
Ok(true) => {
// Backpressure - queue message and signal handle
if let Some(handle) = self.handles.get_mut(&channel_id) {
handle.set_backpressure(true);
handle.queue_pending(SubstreamEvent::Message { payload, flag });
}
}
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?channel_id,
?flag,
?error,
"failed to send data to remote peer",
);
}
}
}
Some((_, Some(SubstreamEvent::RecvClosed))) => {}
Expand Down
Loading