Skip to content

Wire cooperative cancellation into the reduce and QTF fetch native streams#22248

Merged
mch2 merged 5 commits into
opensearch-project:mainfrom
mch2:pr/rust-cancel-and-qtf-fetch
Jun 21, 2026
Merged

Wire cooperative cancellation into the reduce and QTF fetch native streams#22248
mch2 merged 5 commits into
opensearch-project:mainfrom
mch2:pr/rust-cancel-and-qtf-fetch

Conversation

@mch2

@mch2 mch2 commented Jun 20, 2026

Copy link
Copy Markdown
Member

Description

A cancelled coordinator-reduce or QTF fetch was either abort()-killed mid-send (skipping drop+drain, leaking the aggregate's GroupValues) or left with no cancel signal at all (stranding its stream_next task and DataFusion pool reservation). Make cancel cooperative.

  • cross_rt_stream: optional CancellationToken on the producer loop; a cancel select-breaks the loop and falls through to drop(stream)+drain instead of a JoinSet abort. Existing call sites pass None (unchanged). Covered by 3 cargo tests.
  • api.rs reduce paths: pass the token, do not register the abort handle.
  • query_executor.rs wrap_stream_as_handle (QTF fetch-by-rowid): build with the cancellable variant and register the abort + CPU runtime handles, matching execute_query.
  • AnalyticsSearchBackendPlugin.cancelByContext SPI (default no-op) + datafusion impl; AnalyticsSearchService registers a fetch task cancellation listener that fires it.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…reams

A cancelled coordinator-reduce or QTF fetch was either abort()-killed mid-send (skipping
drop+drain, leaking the aggregate's GroupValues) or left with no cancel signal at all
(stranding its stream_next task and DataFusion pool reservation). Make cancel cooperative.

- cross_rt_stream: optional CancellationToken on the producer loop; a cancel select-breaks
  the loop and falls through to drop(stream)+drain instead of a JoinSet abort. Existing call
  sites pass None (unchanged). Covered by 3 cargo tests.
- api.rs reduce paths: pass the token, do not register the abort handle.
- query_executor.rs wrap_stream_as_handle (QTF fetch-by-rowid): build with the cancellable
  variant and register the abort + CPU runtime handles, matching execute_query.
- AnalyticsSearchBackendPlugin.cancelByContext SPI (default no-op) + datafusion impl;
  AnalyticsSearchService registers a fetch task cancellation listener that fires it.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@mch2 mch2 requested a review from a team as a code owner June 20, 2026 07:49
@github-actions

github-actions Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 8368f14)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The biased select checks the cancellation token first, but if the token is None, std::future::pending::<()>().await is used. This future never completes, so the select will always take the second branch. However, if cancel_token is Some(t) and t.cancelled() completes, the loop breaks immediately. This asymmetry means that when cancel_token is None, the select overhead is present on every iteration even though cancellation is impossible. A more efficient approach would be to use different code paths (e.g., a macro or conditional compilation) to avoid the select entirely when cancel_token is None.

let next = tokio::select! {
    biased;
    _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
    n = stream.next() => n,
};
let res = match next {
    Some(r) => r,
    None => break,
};
let sent = tokio::select! {
    biased;
    _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
    r = tx_captured.send(res) => r,
};
Resource Leak

The _abort_handle returned by new_with_df_error_stream_cancellable is intentionally ignored (not registered) to avoid abort() mid-send. However, if the cancellation token is never fired and the stream is dropped before completion, the spawned task may continue running indefinitely because there is no abort mechanism. This could strand CPU resources and memory pool reservations. The comment states cancel is "token-only," but if the token is never fired and the consumer drops the stream early, the task has no way to detect this and stop.

let (cross_rt_stream, _abort_handle, task_done) =
    CrossRtStream::new_with_df_error_stream_cancellable(df_stream, cpu_exec.clone(), token.clone());
// Reduce path: cancel via the token only, do NOT register the abort handle — an abort() mid-send
// would skip the cross_rt drop+drain cleanup and leak the aggregate's in-flight GroupValues.
if let Some(rt) = cpu_exec.handle() {
Race Condition

The cancellation listener is set after the stream is created and the try block begins. If the task is cancelled between stream creation (line 357) and listener registration (line 360), the cancel signal will not reach the backend, leaving the native stream running. The listener should be registered before any blocking or async operations on the stream begin.

// On cancel, release a fetch parked in the native pull via cooperative cancellation, not
// stream.close() (which would race the in-flight native pull).
task.setCancellationListener(() -> backend.cancelByContext(task.getId()));

@github-actions

github-actions Bot commented Jun 20, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 8368f14

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Register cancellation listener before stream creation

The cancellation listener is set after the stream is created, creating a race window
where cancellation could occur before the listener is registered. Move the
setCancellationListener call before creating the stream to ensure cancellation is
properly handled from the start.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [360]

 task.setCancellationListener(() -> backend.cancelByContext(task.getId()));
+try (FragmentResources ctx = resources) {
+    Iterator<EngineResultBatch> it = ctx.stream().iterator();
Suggestion importance[1-10]: 8

__

Why: This identifies a critical race condition where cancellation could occur before the listener is registered, potentially leaving the native execution uninterruptible. Moving the listener registration before stream creation ensures proper cancellation handling.

Medium
General
Validate context ID more strictly

The check contextId != 0 may not be sufficient to validate a legitimate context.
Consider whether negative context IDs are valid, and if not, use contextId > 0
instead to prevent potential issues with negative values.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java [1029-1031]

-if (contextId != 0) {
+if (contextId > 0) {
     NativeBridge.cancelQuery(contextId);
 }
Suggestion importance[1-10]: 3

__

Why: While the suggestion to use contextId > 0 instead of contextId != 0 may be more semantically correct if negative IDs are invalid, the current check is functional and the impact is minor without evidence that negative IDs are problematic in this codebase.

Low

Previous suggestions

Suggestions up to commit 3296c94
CategorySuggestion                                                                                                                                    Impact
General
Optimize token-less path to avoid pending future

The std::future::pending() branch for None token creates an unnecessary future that
never completes. Consider restructuring to avoid this overhead when no token is
provided, or use a more efficient pattern like conditional compilation of the select
branches.

sandbox/plugins/analytics-backend-datafusion/rust/src/cross_rt_stream.rs [103-107]

-let next = tokio::select! {
-    biased;
-    _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
-    n = stream.next() => n,
+let next = if let Some(ref token) = cancel_token {
+    tokio::select! {
+        biased;
+        _ = token.cancelled() => break,
+        n = stream.next() => n,
+    }
+} else {
+    stream.next().await
 };
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a minor optimization opportunity where std::future::pending() creates overhead when cancel_token is None. The proposed conditional approach would eliminate this overhead, though the impact is relatively small since most production paths likely use a token.

Low
Avoid creating unused abort handle for token-only cancellation

The abort handle is intentionally ignored but still created. Consider adding a
variant of new_with_df_error_stream_cancellable that doesn't create the abort handle
when token-based cancellation is used, avoiding unnecessary resource allocation.

sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs [1825-1828]

-let (cross_rt_stream, _abort_handle, task_done) =
-    CrossRtStream::new_with_df_error_stream_cancellable(df_stream, cpu_exec.clone(), token.clone());
-// Reduce path: cancel via the token only, do NOT register the abort handle — an abort() mid-send
+let (cross_rt_stream, task_done) =
+    CrossRtStream::new_with_df_error_stream_token_only(df_stream, cpu_exec.clone(), token.clone());
+// Reduce path: cancel via the token only — no abort handle created since abort() mid-send
 // would skip the cross_rt drop+drain cleanup and leak the aggregate's in-flight GroupValues.
Suggestion importance[1-10]: 4

__

Why: The suggestion identifies a valid optimization where the _abort_handle is intentionally discarded. However, creating a new API variant would add complexity for a minor resource saving. The current approach is acceptable since the abort handle creation overhead is minimal, and the unified API is simpler to maintain.

Low
Suggestions up to commit 92b17a6
CategorySuggestion                                                                                                                                    Impact
General
Extract duplicated cancellation check logic

The cancellation check logic is duplicated in both tokio::select! blocks. Extract
this into a helper async function or macro to reduce code duplication and improve
maintainability. This also reduces the risk of inconsistencies if the cancellation
logic needs to change.

sandbox/plugins/analytics-backend-datafusion/rust/src/cross_rt_stream.rs [102-119]

+async fn check_cancel(token: &Option<CancellationToken>) {
+    match token {
+        Some(t) => t.cancelled().await,
+        None => std::future::pending::<()>().await,
+    }
+}
 loop {
     let next = tokio::select! {
         biased;
-        _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
+        _ = check_cancel(&cancel_token) => break,
         n = stream.next() => n,
     };
     let res = match next {
         Some(r) => r,
         None => break,
     };
     let sent = tokio::select! {
         biased;
-        _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
+        _ = check_cancel(&cancel_token) => break,
         r = tx_captured.send(res) => r,
     };
     if sent.is_err() {
         break;
     }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies code duplication in the cancellation check logic. Extracting it into a helper function would improve maintainability. However, the improved_code introduces a new function that would need to be defined outside the closure, and the inline match expression is reasonably clear for this specific use case.

Low
Move variable inside null check

The fetchContextId is redundantly computed before the null check. Move the variable
declaration inside the if-block to avoid unnecessary computation when task is null,
improving code clarity and avoiding potential confusion.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [360-363]

-final long fetchContextId = task != null ? task.getId() : 0L;
 if (task != null) {
+    final long fetchContextId = task.getId();
     task.setCancellationListener(() -> backend.cancelByContext(fetchContextId));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that fetchContextId can be moved inside the null check to avoid redundant computation. However, the impact is minimal since the computation is trivial (just calling task.getId()), and the current code is slightly more readable with the variable declared once upfront.

Low
Suggestions up to commit 4dbd6e1
CategorySuggestion                                                                                                                                    Impact
General
Simplify conditional context ID handling

The fetchContextId is computed conditionally but always passed to cancelByContext,
which treats 0 as a no-op. Simplify by passing task.getId() directly inside the
listener lambda, eliminating the redundant null check and intermediate variable.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [360-363]

-final long fetchContextId = task != null ? task.getId() : 0L;
 if (task != null) {
-    task.setCancellationListener(() -> backend.cancelByContext(fetchContextId));
+    task.setCancellationListener(() -> backend.cancelByContext(task.getId()));
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies redundancy in the code. However, the improvement is minor—eliminating fetchContextId reduces one line but doesn't significantly impact correctness or maintainability. The existing pattern is clear and defensive.

Low
Extract repeated cancellation future pattern

The match &cancel_token pattern is repeated in both select arms. Extract this into a
helper future or use if let Some(token) to avoid duplication and improve
readability. This reduces the risk of inconsistent behavior if the pattern changes.

sandbox/plugins/analytics-backend-datafusion/rust/src/cross_rt_stream.rs [103-107]

+let cancel_fut = async {
+    if let Some(t) = &cancel_token {
+        t.cancelled().await
+    } else {
+        std::future::pending::<()>().await
+    }
+};
 let next = tokio::select! {
     biased;
-    _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
+    _ = cancel_fut => break,
     n = stream.next() => n,
 };
Suggestion importance[1-10]: 4

__

Why: The suggestion addresses code duplication and improves readability. However, extracting the future outside the select! macro may change the cancellation semantics slightly (the future is created once vs. on each iteration), though in practice this should be equivalent. The improvement is modest.

Low
Check send result before cancellation

The cancellation check after send may break the loop before verifying sent.is_err(),
potentially masking a send failure. Consider checking the send result first, then
the token, to ensure proper error handling and avoid silent failures.

sandbox/plugins/analytics-backend-datafusion/rust/src/cross_rt_stream.rs [112-119]

-let sent = tokio::select! {
-    biased;
-    _ = async { match &cancel_token { Some(t) => t.cancelled().await, None => std::future::pending::<()>().await } } => break,
-    r = tx_captured.send(res) => r,
-};
+let sent = tx_captured.send(res).await;
+if sent.is_err() {
+    break;
+}
+if let Some(t) = &cancel_token {
+    if t.is_cancelled() {
+        break;
+    }
+}
Suggestion importance[1-10]: 2

__

Why: The suggestion misunderstands the cooperative cancellation design. The biased select intentionally checks the token first to break immediately on cancel, which is the desired behavior. Checking sent.is_err() first would delay cancellation. The improved code also uses is_cancelled() which is a non-blocking check, changing the semantics from the awaiting cancelled().await.

Low
Suggestions up to commit a6edd63
CategorySuggestion                                                                                                                                    Impact
General
Replace hardcoded yields with timeout

The hardcoded yield count of 2 may be insufficient for complex execution graphs with
many child tasks. Consider using a more robust approach like a small timeout or
checking task completion status to ensure all child producers complete their
cleanup.

sandbox/plugins/analytics-backend-datafusion/rust/src/cross_rt_stream.rs [124-127]

 drop(stream);
-for _ in 0..2 {
-    tokio::task::yield_now().await;
-}
+// Allow child tasks time to complete cleanup after drop schedules their cancellation
+tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Suggestion importance[1-10]: 5

__

Why: The suggestion addresses a potential robustness issue with the hardcoded yield count. However, replacing yields with a timeout changes the semantics and may introduce unnecessary delays in the common case where 2 yields suffice.

Low
Move variable inside conditional block

The fetchContextId is captured in the lambda but only used when task is non-null.
Move the variable declaration inside the if block to avoid unnecessary computation
when task is null and improve code clarity.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [360-363]

-final long fetchContextId = task != null ? task.getId() : 0L;
 if (task != null) {
+    final long fetchContextId = task.getId();
     task.setCancellationListener(() -> backend.cancelByContext(fetchContextId));
 }
Suggestion importance[1-10]: 4

__

Why: Moving fetchContextId inside the if block improves code clarity and avoids unnecessary computation when task is null. However, the impact is minimal since task.getId() is a simple getter call.

Low

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for a6edd63: SUCCESS

@codecov

codecov Bot commented Jun 20, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.35%. Comparing base (4cc80cf) to head (8368f14).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##               main   #22248    +/-   ##
==========================================
  Coverage     73.35%   73.35%            
- Complexity    75937    76005    +68     
==========================================
  Files          6071     6075     +4     
  Lines        344993   345282   +289     
  Branches      49638    49697    +59     
==========================================
+ Hits         253080   253295   +215     
- Misses        71710    71757    +47     
- Partials      20203    20230    +27     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

- cross_rt_stream: remove the post-drop `for _ in 0..2 { yield_now }` loop.
  A fixed yield count is not sufficient across varying CPU cores; the deferred
  child-task drops are reaped by the bounded flush_cpu_runtime in stream_close
  (4 workers x 32 yields, 500ms-capped), and drop(stream) itself frees the
  aggregate's GroupValues. The loop was a redundant best-effort nudge.
- query_tracker test: move test_cancel_query_flushes_deferred_drops to a unique
  context id (70_001 -> 80_001) so it no longer collides with
  test_top_n_picks_highest_current_bytes in the process-wide registry under
  parallel test execution.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4dbd6e1

…qtf-fetch

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 92b17a6

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 92b17a6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 3296c94.

PathLineSeverityDescription
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java276lowRuntime null guard replaced with a Java assert statement. Java assertions are disabled by default in production JVMs (-ea is not typically passed), so if task is ever null in prod the subsequent task.isCancelled() call will throw a NullPointerException instead of the previously graceful early-return. This is likely an unintentional regression rather than malicious, but the behavioral change warrants review.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3296c94

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3296c94: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

…qtf-fetch

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 8368f14

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 8368f14: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions

Copy link
Copy Markdown
Contributor

✅ Gradle check result for 8368f14: SUCCESS

@mch2 mch2 merged commit c73dc2e into opensearch-project:main Jun 21, 2026
19 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants