Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use crate::{
webrtc::{
schema::webrtc::message::Flag,
substream::{Event as SubstreamEvent, Substream as WebRtcSubstream, SubstreamHandle},
util::WebRtcMessage,
util::{extract_framed_message, WebRtcMessage},
},
Endpoint,
},
types::{protocol::ProtocolName, SubstreamId},
PeerId,
};

use bytes::{Bytes, BytesMut};
use futures::{Stream, StreamExt};
use indexmap::IndexMap;
use str0m::{
Expand Down Expand Up @@ -210,6 +211,18 @@ pub struct WebRtcConnection {

/// Substream handles.
handles: SubstreamHandleSet,

/// Inbound noise-channel byte buffer for reassembling pbio-delimited frames.
Comment thread
lexnv marked this conversation as resolved.
Outdated
///
/// The libp2p-go msgio implementation issues two separate `Write` calls:
/// - variant length
/// - protobuf body
///
/// These will become two distinct SCTP messages on the data channel.
///
/// Accumulate raw bytes here and only attempt protobuf decode once a
/// full `varint length ++ body` frame is available.
recv_buffers: HashMap<ChannelId, BytesMut>,
}

impl WebRtcConnection {
Expand All @@ -236,6 +249,7 @@ impl WebRtcConnection {
pending_outbound: HashMap::new(),
channels: HashMap::new(),
handles: SubstreamHandleSet::new(),
recv_buffers: HashMap::new(),
}
}

Expand Down Expand Up @@ -310,6 +324,7 @@ impl WebRtcConnection {
self.pending_outbound.remove(&channel_id);
self.channels.remove(&channel_id);
self.handles.remove(&channel_id);
self.recv_buffers.remove(&channel_id);

Ok(())
}
Expand All @@ -331,7 +346,7 @@ impl WebRtcConnection {
async fn on_inbound_opening_channel_data(
&mut self,
channel_id: ChannelId,
data: Vec<u8>,
data: Bytes,
header_received: bool,
) -> crate::Result<Option<(SubstreamId, SubstreamHandle, Option<Permit>)>> {
tracing::trace!(
Expand Down Expand Up @@ -419,7 +434,7 @@ impl WebRtcConnection {
async fn on_outbound_opening_channel_data(
&mut self,
channel_id: ChannelId,
data: Vec<u8>,
data: Bytes,
mut dialer_state: WebRtcDialerState,
context: ChannelContext,
) -> Result<Option<(SubstreamId, SubstreamHandle)>, SubstreamError> {
Expand Down Expand Up @@ -565,8 +580,9 @@ impl WebRtcConnection {
async fn on_open_channel_data(
&mut self,
channel_id: ChannelId,
data: Vec<u8>,
data: Bytes,
) -> crate::Result<()> {
// Decode errors are not recoverable.
let message = WebRtcMessage::decode(&data)?;

tracing::debug!(
Expand Down Expand Up @@ -595,6 +611,13 @@ impl WebRtcConnection {
}

/// Handle data received from a channel.
///
/// Bytes are accumulated in a per-channel buffer and only handed to the per-state
/// dispatcher once a complete `varint length ++ protobuf body` frame is available.
///
/// This handles peers (go-libp2p's pbio writer) that split varint and body
/// across two SCTP messages, while remaining a no-op for peers that send the whole
/// frame in one message (smoldot).
async fn on_inbound_data(&mut self, channel_id: ChannelId, data: Vec<u8>) -> crate::Result<()> {
tracing::debug!(
target: LOG_TARGET,
Expand All @@ -605,6 +628,31 @@ impl WebRtcConnection {
"received channel data",
);

self.recv_buffers.entry(channel_id).or_default().extend_from_slice(&data);

loop {
let Some(buffer) = self.recv_buffers.get_mut(&channel_id) else {
return Ok(());
};

let Some(body) = extract_framed_message(buffer)? else {
return Ok(());
};

self.dispatch_framed_message(channel_id, body).await?;
// If the channel was closed/removed during dispatch, stop draining its buffer.
if !self.channels.contains_key(&channel_id) {
return Ok(());
}
}
}

/// Dispatch a single reassembled protobuf body to the per-channel-state handler.
async fn dispatch_framed_message(
&mut self,
channel_id: ChannelId,
data: Bytes,
) -> crate::Result<()> {
let Some(state) = self.channels.remove(&channel_id) else {
tracing::warn!(
target: LOG_TARGET,
Expand Down
51 changes: 45 additions & 6 deletions src/transport/webrtc/opening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
use crate::{
config::Role,
crypto::{ed25519::Keypair, noise::NoiseContext},
transport::{webrtc::util::WebRtcMessage, Endpoint},
transport::{
webrtc::util::{extract_framed_message, WebRtcMessage},
Endpoint,
},
types::ConnectionId,
Error, PeerId,
};

use bytes::BytesMut;
use multiaddr::{Multiaddr, Protocol};
use multihash_codetable::Code;
use str0m::{
Expand Down Expand Up @@ -111,6 +115,18 @@ pub struct OpeningWebRtcConnection {

/// Local address.
local_address: SocketAddr,

/// Inbound noise-channel byte buffer for reassembling pbio-delimited frames.
Comment thread
lexnv marked this conversation as resolved.
Outdated
///
/// The libp2p-go msgio implementation issues two separate `Write` calls:
/// - variant length
/// - protobuf body
///
/// These will become two distinct SCTP messages on the data channel.
///
/// Accumulate raw bytes here and only attempt protobuf decode once a
/// full `varint length ++ body` frame is available.
noise_recv_buffer: BytesMut,
}

/// Connection state.
Expand Down Expand Up @@ -168,6 +184,7 @@ impl OpeningWebRtcConnection {
id_keypair,
peer_address,
local_address,
noise_recv_buffer: BytesMut::new(),
}
}

Expand Down Expand Up @@ -244,7 +261,28 @@ impl OpeningWebRtcConnection {
///
/// If the peer is accepted, [`OpeningWebRtcConnection::on_accept()`] is called which creates
/// the final Noise message and sends it to the remote peer, concluding the handshake.
fn on_noise_channel_data(&mut self, data: Vec<u8>) -> crate::Result<WebRtcEvent> {
fn on_noise_channel_data(&mut self, data: Vec<u8>) -> crate::Result<Option<WebRtcEvent>> {
tracing::trace!(
target: LOG_TARGET,
len = data.len(),
buffered = self.noise_recv_buffer.len(),
"noise channel data received",
);

self.noise_recv_buffer.extend_from_slice(&data);

let body = match extract_framed_message(&mut self.noise_recv_buffer)? {
Some(body) => body,
None => {
tracing::trace!(
target: LOG_TARGET,
buffered = self.noise_recv_buffer.len(),
"incomplete noise frame, waiting for more bytes",
Copy link
Copy Markdown
Collaborator

@dmitry-markin dmitry-markin Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to also log peerid/remote endpoint here and in other logs.

);
return Ok(None);
}
};

tracing::trace!(target: LOG_TARGET, "handle noise handshake reply");

let State::HandshakeSent { mut context } =
Expand All @@ -253,7 +291,7 @@ impl OpeningWebRtcConnection {
return Err(Error::InvalidState);
};

let message = WebRtcMessage::decode(&data)?.payload.ok_or(Error::InvalidData)?;
let message = WebRtcMessage::decode(&body)?.payload.ok_or(Error::InvalidData)?;
let remote_peer_id = context.get_remote_peer_id(&message)?;

tracing::trace!(
Expand Down Expand Up @@ -283,10 +321,10 @@ impl OpeningWebRtcConnection {
.with(Protocol::Certhash(certificate))
.with(Protocol::P2p(remote_peer_id.into()));

Ok(WebRtcEvent::ConnectionOpened {
Ok(Some(WebRtcEvent::ConnectionOpened {
peer: remote_peer_id,
endpoint: Endpoint::listener(address, self.connection_id),
})
}))
}

/// Accept connection by sending the final Noise handshake message
Expand Down Expand Up @@ -436,7 +474,8 @@ impl OpeningWebRtcConnection {
}

match self.on_noise_channel_data(data.data) {
Ok(event) => return event,
Ok(Some(event)) => return event,
Ok(None) => continue,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
Expand Down
8 changes: 4 additions & 4 deletions src/transport/webrtc/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
transport::webrtc::{schema::webrtc::message::Flag, util::WebRtcMessage},
transport::webrtc::{
schema::webrtc::message::Flag,
util::{WebRtcMessage, MAX_FRAME_SIZE},
},
Error,
};

Expand All @@ -36,9 +39,6 @@ use std::{
time::Duration,
};

/// Maximum frame size.
const MAX_FRAME_SIZE: usize = 16384;

/// Timeout for waiting on FIN_ACK after sending FIN.
/// Matches go-libp2p's 5 second stream close timeout.
const FIN_ACK_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down
Loading
Loading