Skip to content

Cooperatively cancel indexed scan when query task is cancelled#22198

Open
aravindsagar wants to merge 1 commit into
opensearch-project:mainfrom
aravindsagar:cancellation-propagation-indexed-scan
Open

Cooperatively cancel indexed scan when query task is cancelled#22198
aravindsagar wants to merge 1 commit into
opensearch-project:mainfrom
aravindsagar:cancellation-propagation-indexed-scan

Conversation

@aravindsagar

Copy link
Copy Markdown
Contributor

Description

The indexed scan had no cancellation checkpoints. When a query task was cancelled, IndexReader kept dispatching and evaluating every remaining row group to completion, holding its native (Arrow/DataFusion) memory the entire time. The per-row-group evaluator runs inside tokio::task::spawn_blocking, which cannot be aborted once started.

This change threads the per-query CancellationToken (from QUERY_REGISTRY) down through IndexedTableConfig → IndexedExec → IndexReader and adds three cooperative checkpoints:

  • IndexReader::poll_next_row_group — bails before dispatching the next row group, so no new spawn_blocking jobs are admitted after cancellation.
  • IndexReader::fetch_row_group — checks before the evaluator call, so a queued job that starts after cancellation skips its work.
  • IndexedStream::poll_inner — stops draining, so decoded batches in the current segment are released immediately.

Cancellation surfaces as a DataFusionError("query cancelled") and propagates as a clean fragment failure — never partial results.

Queries with context_id == 0 (untracked) and all unit tests pass None for the token, leaving existing behaviour unchanged.

Testing

Unit tests:

  • cancel_stops_row_group_dispatch: verifies that after cancellation, at most one already-in-flight spawn_blocking job (non-abortable) completes, the reader terminates via the cancellation error path, and fewer than all 8 row groups are evaluated.
  • All existing e2e indexed table tests pass unchanged with cancellation_token: None.

End-to-end on a 100M-doc ClickBench cluster (ec2, r8g.2xlarge):

Manual cancellation via _tasks/_cancel:
Ran stats count() by UserID (baseline ~1.4s). Fired _tasks/_cancel at random offsets (166–675ms into the query). All 5 trials returned the query within 8–10ms of the cancel, with tasks_cancelled=3 (coordinator + shard + fragment) and HTTP 500 TaskCancelledException[query cancelled]. Never a partial result.

SBP auto-cancellation:
Configured search_backpressure.mode=enforced, elapsed_time_millis_threshold=500ms, and a non-zero node_duress.native_memory_limit so native-memory duress can trip and populate SBP's candidate task list. Ran memory-heavy concurrent queries. Baseline (SBP disabled): ~8,500ms. Under enforced SBP: all 10 queries cancelled and returned in ~743–1001ms — stopped within one SBP poll cycle of becoming eligible. Error: query cancelled on every trial.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@aravindsagar aravindsagar requested a review from a team as a code owner June 17, 2026 05:34
@github-actions

github-actions Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 5647683)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Race Condition

In poll_next_row_group, the cancellation check at line 255 occurs before checking self.pending_prefetch. If a prefetch job completes between the cancellation check and the pending check, the reader may process that result instead of returning the cancellation error immediately. This creates a narrow window where one additional row group could be processed after cancellation is detected, beyond the documented "at most one already-in-flight" guarantee.

// Bail before dispatching the next row group if the query is cancelled.
if self.is_cancelled() {
    return Poll::Ready(Err(DataFusionError::Execution(
        "query cancelled".to_string(),
    )));
}
if self.current_rg_idx >= self.row_groups.len() {
    return Poll::Ready(Ok(None));
}
if let Some(result) = self.cached_result.take() {
Possible Issue

In fetch_row_group, the cancellation check at line 199 uses is_some_and, which returns false when the token is None. However, the function signature accepts Option<&CancellationToken>, and the call site at line 244 passes token.as_ref(). If token is Some(t) but t.is_cancelled() is false, the check correctly proceeds. But if token is None, the check also proceeds (correct for untracked queries). The logic is sound, but the double-Option pattern (Option<Option<&T>> after as_ref() on Option<T>) is subtle and could be misread during maintenance.

if cancellation_token.is_some_and(|t| t.is_cancelled()) {
    return Err("query cancelled".to_string());
}

@github-actions

github-actions Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 5647683
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Reduce cancellation check frequency overhead

The cancellation check in poll_next_row_group occurs at the start of every loop
iteration, which may add overhead for queries with many small row groups. Consider
checking cancellation less frequently (e.g., every N iterations) to reduce the
performance impact while still maintaining reasonable responsiveness.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/stream.rs [254-259]

-if self.is_cancelled() {
+// Check cancellation every 10 row groups to reduce overhead
+if self.current_rg_idx % 10 == 0 && self.is_cancelled() {
     return Poll::Ready(Err(DataFusionError::Execution(
         "query cancelled".to_string(),
     )));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion to check cancellation every 10 row groups could reduce overhead, but the current implementation uses a cheap atomic load (is_cancelled() is documented as "Cheap (one relaxed atomic load)"). The trade-off between responsiveness and performance is questionable, and the hardcoded value of 10 may not be appropriate for all workloads.

Low
Improve cancellation token check pattern

The cancellation check uses is_some_and which requires the token to be Some and
cancelled. However, the function signature accepts Option<&CancellationToken>, so a None token will
never trigger cancellation. Consider checking if the token reference itself is
cancelled directly for consistency with the is_cancelled() method pattern.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/stream.rs [199-201]

-if cancellation_token.is_some_and(|t| t.is_cancelled()) {
-    return Err("query cancelled".to_string());
+if let Some(token) = cancellation_token {
+    if token.is_cancelled() {
+        return Err("query cancelled".to_string());
+    }
 }
Suggestion importance[1-10]: 3

__

Why: While the suggested pattern is more explicit, the existing is_some_and is functionally equivalent and more concise. Both correctly handle None tokens (no cancellation). This is a minor style preference with no functional impact.

Low

Previous suggestions

Suggestions up to commit 8b87043
CategorySuggestion                                                                                                                                    Impact
General
Use consistent cancellation error messages

The error message "query cancelled" is a generic string. Consider using a dedicated
error type or constant to ensure consistency across all cancellation error paths and
improve error handling.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/stream.rs [199-201]

+const QUERY_CANCELLED_MSG: &str = "query cancelled";
+
 if cancellation_token.is_some_and(|t| t.is_cancelled()) {
-    return Err("query cancelled".to_string());
+    return Err(QUERY_CANCELLED_MSG.to_string());
 }
Suggestion importance[1-10]: 4

__

Why: Using a constant for error messages improves consistency and maintainability. However, the impact is minor since the message is already consistent across the codebase in this PR.

Low
Optimize hot-path cancellation checks

The is_cancelled method is called in hot paths (poll_inner loop and
poll_next_row_group loop). Consider caching the cancellation state or checking less
frequently to reduce atomic load overhead, especially when processing many small row
groups.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/stream.rs [176-180]

 fn is_cancelled(&self) -> bool {
     self.cancellation_token
         .as_ref()
-        .is_some_and(|t| t.is_cancelled())
+        .map_or(false, |t| t.is_cancelled())
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion changes is_some_and to map_or which is functionally equivalent but doesn't reduce overhead. The actual optimization mentioned (caching or checking less frequently) is not implemented in the improved_code.

Low
Reduce cancellation check frequency overhead

The cancellation check occurs at the start of every poll_next_row_group loop
iteration. If row groups are small, this could add measurable overhead. Consider
checking cancellation only after processing N row groups or after a time threshold.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/stream.rs [254-259]

-if self.is_cancelled() {
+// Check cancellation every N row groups to reduce overhead
+if self.current_rg_idx % 10 == 0 && self.is_cancelled() {
     return Poll::Ready(Err(DataFusionError::Execution(
         "query cancelled".to_string(),
     )));
 }
Suggestion importance[1-10]: 2

__

Why: While reducing check frequency could improve performance, checking every 10 row groups could delay cancellation response significantly. The suggestion lacks justification for the specific threshold and may compromise the responsiveness goal of the cancellation feature.

Low

@himshikhagupta

Copy link
Copy Markdown
Contributor

How are we ensuring that all data is getting dropped in this flow when query is cancelled? Did we do any memory leak analysis with this?

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 8b87043: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

The indexed scan had no cancellation checkpoints, so a cancelled query kept
running every remaining row group to completion. The per-row-group evaluator
runs inside tokio::task::spawn_blocking (non-abortable), so the scan held
and grew its native (Arrow/DataFusion) memory until natural completion.

Thread the per-query CancellationToken (from the global QUERY_REGISTRY) down
through IndexedTableConfig -> IndexedExec -> IndexReader and add cooperative
checkpoints:
  - IndexReader::poll_next_row_group: bail before dispatching the next row group.
  - IndexReader::fetch_row_group: check before the evaluator call so a queued
    blocking job that starts after cancel skips its work.
  - IndexedStream::poll_inner: stop draining so decoded batches are released.

Cancellation surfaces as a query-level DataFusionError ("query cancelled")
which propagates as a clean fragment failure (never partial results).

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@aravindsagar aravindsagar force-pushed the cancellation-propagation-indexed-scan branch from 8b87043 to 751f50c Compare June 19, 2026 08:40
@github-actions

Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 751f50c.

PathLineSeverityDescription
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/CancelTestDelay.java1mediumNew production class adds environment-variable-triggered execution delays (5 seconds) into multiple critical query execution paths. Setting ANALYTICS_CANCEL_TEST_STAGE or the JVM system property analytics.cancel.test.stage to any stage value (1-6) in a production or shared environment would introduce significant stalls on search threads, enabling a denial-of-service condition. Test injection hooks in production code should never be controllable via external environment variables.
sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs355mediumA 30-second tokio::sleep is injected directly into the async query execution path and activated via the ANALYTICS_CANCEL_TEST_STAGE=4 environment variable. If this variable is set in a production Rust process, every query will stall for 30 seconds before reading any Parquet data. Debug/test timing hooks of this magnitude do not belong in production code paths.
sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/AbstractDatafusionReduceSink.java170mediumA hardcoded 30-second Thread.sleep is embedded in the reduce drain loop, gated by a local parseCancelStage() helper that reads ANALYTICS_CANCEL_TEST_STAGE. This duplicates the CancelTestDelay pattern with a significantly longer delay (30 s vs. 5 s) and using a private method rather than the shared utility class, making it harder to audit and remove. Setting stage=6 in any environment with this code deployed would block every reduce operation for 30 seconds.
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/shard/ShardFragmentStageExecution.java132mediumCancelTestDelay.sleep(5) is placed inside onStreamResponse(), which is called for every streaming batch — not just the first, despite the comment saying 'hold on coordinator when first batch arrives'. When ANALYTICS_CANCEL_TEST_STAGE=5, every batch response on the coordinator will be delayed by 5 seconds, multiplying the total delay by the number of batches. This appears to be a logic error in the test hook placement that could severely amplify latency.
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/shard/ShardTaskRunner.java55lowCancelTestDelay.sleep(2) is inserted in the production fragment dispatch path before transport.dispatchFragmentStreaming. When stage=2 is active, every fragment dispatch to every shard is delayed, stalling all concurrent queries. Low severity because it is disabled by default, but represents an unnecessarily broad blast radius for a test hook (affects all shards, not just one targeted delay point).

The table above displays the top 10 most important findings.

Total: 5 | Critical: 0 | High: 0 | Medium: 4 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@aravindsagar aravindsagar marked this pull request as draft June 19, 2026 12:09
@aravindsagar aravindsagar force-pushed the cancellation-propagation-indexed-scan branch from 751f50c to 5647683 Compare June 19, 2026 13:04
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5647683

@aravindsagar aravindsagar marked this pull request as ready for review June 19, 2026 13:06
@aravindsagar

Copy link
Copy Markdown
Contributor Author

How are we ensuring that all data is getting dropped in this flow when query is cancelled? Did we do any memory leak analysis with this?

This is not targeted towards fixing memory leak, but to ensure that queries stop execution when cancelled. Did some test runs where the memory leak wasn't any worse with this change.

@github-actions

Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5647683: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants