Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 73 additions & 26 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,10 @@ pub(crate) fn is_cancel_error(res: &Result<RequestResult, ClientError>) -> bool
matches!(res, Err(ClientError::Deadline)) || is_too_many_open_files(res)
}

pub(crate) fn try_send_report<T>(report_tx: &kanal::Sender<T>, report: T) -> bool {
report_tx.send(report).is_ok()
}

/// Check error was "Too many open file"
fn is_too_many_open_files(res: &Result<RequestResult, ClientError>) -> bool {
res.as_ref()
Expand Down Expand Up @@ -1075,7 +1079,9 @@ async fn work_http2_once(
if let Some(start_latency_correction) = start_latency_correction {
set_start_latency_correction(&mut res, start_latency_correction);
}
report_tx.send(res).unwrap();
if !try_send_report(report_tx, res) {

@ahmed-mekky ahmed-mekky Jun 18, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why didn't you just do

if !report_tx.send(res).is_ok() {

the function looks like an unneeded layer of abstraction here

return (true, is_reconnect);
}
(is_cancel, is_reconnect)
}

Expand Down Expand Up @@ -1220,7 +1226,9 @@ pub async fn work(
}
Err(err) => {
if counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand All @@ -1245,7 +1253,9 @@ pub async fn work(
while counter.fetch_add(1, Ordering::Relaxed) < n_tasks {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel {
break;
}
Expand Down Expand Up @@ -1386,7 +1396,9 @@ pub async fn work_with_qps(
Err(err) => {
// Consume a task
if let Ok(()) = rx.recv().await {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand All @@ -1413,7 +1425,9 @@ pub async fn work_with_qps(
while let Ok(()) = rx.recv().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel {
break;
}
Expand Down Expand Up @@ -1560,7 +1574,9 @@ pub async fn work_with_qps_latency_correction(
Err(err) => {
// Consume a task
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand Down Expand Up @@ -1588,7 +1604,9 @@ pub async fn work_with_qps_latency_correction(
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel {
break;
}
Expand Down Expand Up @@ -1697,7 +1715,9 @@ pub async fn work_until(
}
}
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
if !try_send_report(&report_tx, Err(ClientError::Deadline)) {
return;
}
connection_gone = true;
}
}
Expand All @@ -1708,7 +1728,9 @@ pub async fn work_until(
}

Err(err) => {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
break;
}
if s.is_closed() {
break;
}
Expand Down Expand Up @@ -1739,7 +1761,9 @@ pub async fn work_until(
loop {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel || is_end.load(Relaxed) {
break;
}
Expand All @@ -1760,8 +1784,9 @@ pub async fn work_until(
f.abort();
if let Err(e) = f.await
&& e.is_cancelled()
&& !try_send_report(&report_tx, Err(ClientError::Deadline))
{
report_tx.send(Err(ClientError::Deadline)).unwrap();
break;
}
}
}
Expand Down Expand Up @@ -1901,7 +1926,9 @@ pub async fn work_until_with_qps(
}
}
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
if !try_send_report(&report_tx, Err(ClientError::Deadline)) {
return;
}
connection_gone = true;
}
}
Expand All @@ -1913,7 +1940,9 @@ pub async fn work_until_with_qps(
Err(err) => {
// Consume a task
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand Down Expand Up @@ -1949,7 +1978,9 @@ pub async fn work_until_with_qps(
while let Ok(()) = rx.recv().await {
let res = client.work_http1(&mut client_state).await;
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel || is_end.load(Relaxed) {
break;
}
Expand All @@ -1970,8 +2001,9 @@ pub async fn work_until_with_qps(
f.abort();
if let Err(e) = f.await
&& e.is_cancelled()
&& !try_send_report(&report_tx, Err(ClientError::Deadline))
{
report_tx.send(Err(ClientError::Deadline)).unwrap();
break;
}
}
}
Expand Down Expand Up @@ -2109,7 +2141,9 @@ pub async fn work_until_with_qps_latency_correction(
}
}
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
if !try_send_report(&report_tx, Err(ClientError::Deadline)) {
return;
}
connection_gone = true;
}
}
Expand All @@ -2121,7 +2155,9 @@ pub async fn work_until_with_qps_latency_correction(

Err(err) => {
if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand Down Expand Up @@ -2158,7 +2194,9 @@ pub async fn work_until_with_qps_latency_correction(
let mut res = client.work_http1(&mut client_state).await;
set_start_latency_correction(&mut res, start);
let is_cancel = is_cancel_error(&res);
report_tx.send(res).unwrap();
if !try_send_report(&report_tx, res) {
break;
}
if is_cancel || is_end.load(Relaxed) {
break;
}
Expand All @@ -2179,8 +2217,9 @@ pub async fn work_until_with_qps_latency_correction(
f.abort();
if let Err(e) = f.await
&& e.is_cancelled()
&& !try_send_report(&report_tx, Err(ClientError::Deadline))
{
report_tx.send(Err(ClientError::Deadline)).unwrap();
break;
}
}
}
Expand All @@ -2200,7 +2239,7 @@ pub mod fast {
use crate::{
client::{
ClientError, ClientStateHttp1, ClientStateHttp2, HttpWorkType, is_cancel_error,
is_hyper_error, set_connection_time, setup_http2,
is_hyper_error, set_connection_time, setup_http2, try_send_report,
},
pcg64si::Pcg64Si,
result_data::ResultData,
Expand Down Expand Up @@ -2330,7 +2369,13 @@ pub mod fast {
}
};

report_tx.send(result_data).unwrap();
let sent = try_send_report(
&report_tx,
result_data,
);
if !sent {
return true;
}
is_cancel
})
})
Expand Down Expand Up @@ -2366,7 +2411,7 @@ pub mod fast {
}
}
if has_err {
report_tx.send(result_data_err).unwrap();
let _sent = try_send_report(&report_tx, result_data_err);
}
}));
}
Expand Down Expand Up @@ -2412,7 +2457,7 @@ pub mod fast {
}
} => {}
}
report_tx.send(result_data).unwrap();
let _sent = try_send_report(&report_tx, result_data);
}));
}
rt.block_on(local);
Expand Down Expand Up @@ -2547,7 +2592,9 @@ pub mod fast {
}
};

report_tx.send(result_data).unwrap();
if !try_send_report(&report_tx, result_data) {
return true;
}
is_cancel
})
})
Expand Down Expand Up @@ -2582,7 +2629,7 @@ pub mod fast {
}
}
if has_err {
report_tx.send(result_data_err).unwrap();
let _sent = try_send_report(&report_tx, result_data_err);
}
}));
}
Expand Down Expand Up @@ -2633,7 +2680,7 @@ pub mod fast {
result_data.push(Err(ClientError::Deadline));
}
}
report_tx.send(result_data).unwrap();
let _sent = try_send_report(&report_tx, result_data);
}));
}
rt.block_on(local);
Expand Down
20 changes: 14 additions & 6 deletions src/client_h3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum Http3Error {
use crate::client::QueryLimit;
use crate::client::{
Client, ClientError, ConnectionTime, RequestResult, Stream, is_cancel_error,
set_connection_time, set_start_latency_correction,
set_connection_time, set_start_latency_correction, try_send_report,
};
use crate::pcg64si::Pcg64Si;
use crate::result_data::ResultData;
Expand Down Expand Up @@ -341,7 +341,9 @@ async fn create_and_load_up_single_connection_http3(
}
}
_ = s.acquire() => {
report_tx.send(Err(ClientError::Deadline)).unwrap();
if !try_send_report(&report_tx, Err(ClientError::Deadline)) {
return;
}
connection_gone = true;
}
}
Expand All @@ -358,7 +360,9 @@ async fn create_and_load_up_single_connection_http3(
break;
// Consume a task
} else if rx.recv().await.is_ok() {
report_tx.send(Err(err)).unwrap();
if !try_send_report(&report_tx, Err(err)) {
return;
}
} else {
return;
}
Expand Down Expand Up @@ -426,7 +430,9 @@ pub(crate) async fn work_http3_once(
if let Some(start_latency_correction) = start_latency_correction {
set_start_latency_correction(&mut res, start_latency_correction);
}
report_tx.send(res).unwrap();
if !try_send_report(report_tx, res) {
return (true, is_reconnect);
}
(is_cancel, is_reconnect)
}

Expand Down Expand Up @@ -519,7 +525,9 @@ pub(crate) fn http3_connection_fast_work_until(
}
};

report_tx.send(result_data).unwrap();
if !try_send_report(&report_tx, result_data) {
return true;
}
is_cancel
})
})
Expand Down Expand Up @@ -559,7 +567,7 @@ pub(crate) fn http3_connection_fast_work_until(
}
}
if has_err {
report_tx.send(result_data_err).unwrap();
let _sent = try_send_report(&report_tx, result_data_err);
}
}));
}
Expand Down