Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions crates/xmtp_api_grpc/src/grpc_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ impl GrpcClient {
}

/// Builds a tonic request from a body and a generic HTTP Request
fn build_tonic_request(
fn build_tonic_request<B>(
&self,
request: http::request::Builder,
body: Bytes,
) -> Result<tonic::Request<Bytes>, Status> {
body: B,
) -> Result<tonic::Request<B>, Status> {
let request = request
.body(body)
.map_err(|e| tonic::Status::from_error(Box::new(e)))?;
Expand Down Expand Up @@ -187,6 +187,36 @@ impl Client for GrpcClient {
});
Ok(response.to_http().map(Into::into))
}

// Full-duplex needs a real HTTP/2 transport; the gRPC-Web service used on
// wasm cannot carry it, so the browser keeps the trait's default error.
#[cfg(not(target_arch = "wasm32"))]
async fn bidi_stream(
&self,
request: request::Builder,
path: PathAndQuery,
body: xmtp_common::BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
let this = self.clone();
// client requires to be moved so it lives long enough for streaming response future.
let response = async move {
let mut client = this.inner.clone();
this.wait_for_ready(&mut client).await?;
let request = this.build_tonic_request(request, body)?;
let codec = TransparentCodec::default();
client.streaming(request, path, codec).await
};
let req = crate::streams::NonBlockingStreamRequest::new(
Box::pin(response) as crate::streams::ResponseFuture
);
let response = crate::streams::send(req).await.map_err(GrpcError::from)?;
let response = response.map(|body| {
BytesStream::new(GrpcStream {
inner: EscapableTonicStream::new(body),
})
});
Ok(response.to_http().map(Into::into))
}
}

#[xmtp_common::async_trait]
Expand Down Expand Up @@ -311,7 +341,7 @@ pub mod tests {
let request = client
.build_tonic_request(
Default::default(),
PublishRequest { envelopes: vec![] }.encode_to_vec().into(),
prost::bytes::Bytes::from(PublishRequest { envelopes: vec![] }.encode_to_vec()),
)
.unwrap();

Expand Down
43 changes: 43 additions & 0 deletions crates/xmtp_proto/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ pub trait Client: MaybeSend + MaybeSync {
body: Bytes,
) -> Result<http::Response<BytesStream>, ApiClientError>;

/// Open a bidirectional stream (XIP-83). `body` is the outbound stream of
/// encoded protobuf messages (one `Bytes` item per message); the response
/// carries the inbound message stream. Transports without full-duplex
/// support (e.g. gRPC-Web in the browser) keep this default and error.
async fn bidi_stream(
&self,
request: request::Builder,
path: http::uri::PathAndQuery,
body: BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
let _ = (request, path, body);
Err(ApiClientError::OtherUnretryable(
"bidirectional streaming is not supported by this transport".into(),
))
}

/// start a "fake" stream that does not create a TCP connection and will always be pending
fn fake_stream(&self) -> http::Response<BytesStream> {
let fake = FakeEmptyStream::new();
Expand Down Expand Up @@ -179,6 +195,15 @@ where
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).stream(request, path, body).await
}

async fn bidi_stream(
&self,
request: request::Builder,
path: http::uri::PathAndQuery,
body: BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).bidi_stream(request, path, body).await
}
}

#[xmtp_common::async_trait]
Expand All @@ -203,6 +228,15 @@ where
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).stream(request, path, body).await
}

async fn bidi_stream(
&self,
request: request::Builder,
path: http::uri::PathAndQuery,
body: BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).bidi_stream(request, path, body).await
}
}

#[xmtp_common::async_trait]
Expand All @@ -227,6 +261,15 @@ where
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).stream(request, path, body).await
}

async fn bidi_stream(
&self,
request: request::Builder,
path: PathAndQuery,
body: BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
(**self).bidi_stream(request, path, body).await
}
}

#[xmtp_common::async_trait]
Expand Down
9 changes: 9 additions & 0 deletions crates/xmtp_proto/src/traits/boxed_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ where
) -> Result<http::Response<BytesStream>, ApiClientError> {
self.inner.stream(request, path, body).await
}

async fn bidi_stream(
&self,
request: request::Builder,
path: http::uri::PathAndQuery,
body: xmtp_common::BoxDynStream<'static, Bytes>,
) -> Result<http::Response<BytesStream>, ApiClientError> {
self.inner.bidi_stream(request, path, body).await
}
}

pub trait ToBoxedClient {
Expand Down
Loading