-
Notifications
You must be signed in to change notification settings - Fork 6.2k
planner: correlate subquery rule (#66206) #68752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-8.5
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,12 @@ go_test( | |
| timeout = "short", | ||
| srcs = [ | ||
| "main_test.go", | ||
| <<<<<<< HEAD | ||
| ======= | ||
| "rule_cdc_join_reorder_test.go", | ||
| "rule_common_handle_ordering_test.go", | ||
| "rule_correlate_test.go", | ||
| >>>>>>> 7357a2e2f90 (planner: correlate subquery rule (#66206)) | ||
|
Comment on lines
+8
to
+13
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolve the merge conflict markers before merging. This target still has unresolved Also applies to: 24-28 🤖 Prompt for AI Agents |
||
| "rule_derive_topn_from_window_test.go", | ||
| "rule_eliminate_projection_test.go", | ||
| "rule_inject_extra_projection_test.go", | ||
|
|
@@ -15,7 +21,11 @@ go_test( | |
| ], | ||
| data = glob(["testdata/**"]), | ||
| flaky = True, | ||
| <<<<<<< HEAD | ||
| shard_count = 12, | ||
| ======= | ||
| shard_count = 23, | ||
| >>>>>>> 7357a2e2f90 (planner: correlate subquery rule (#66206)) | ||
| deps = [ | ||
| "//pkg/config", | ||
| "//pkg/domain", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,11 +29,24 @@ var testDataMap = make(testdata.BookKeeper) | |
| func TestMain(m *testing.M) { | ||
| testsetup.SetupForCommonTest() | ||
| flag.Parse() | ||
| <<<<<<< HEAD | ||
| testDataMap.LoadTestSuiteData("testdata", "outer2inner") | ||
| testDataMap.LoadTestSuiteData("testdata", "derive_topn_from_window") | ||
| testDataMap.LoadTestSuiteData("testdata", "join_reorder_suite") | ||
| testDataMap.LoadTestSuiteData("testdata", "predicate_pushdown_suite") | ||
| testDataMap.LoadTestSuiteData("testdata", "predicate_simplification") | ||
| ======= | ||
| testDataMap.LoadTestSuiteData("testdata", "outer2inner", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "derive_topn_from_window", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "join_reorder_suite", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "predicate_pushdown_suite", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "predicate_simplification", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "outer_to_semi_join_suite", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "correlate_suite", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "cdc_join_reorder_suite", true) | ||
| testDataMap.LoadTestSuiteData("testdata", "order_aware_join_reorder_suite", true) | ||
|
|
||
| >>>>>>> 7357a2e2f90 (planner: correlate subquery rule (#66206)) | ||
|
Comment on lines
+32
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Resolve the merge conflict markers in The Also applies to: 87-105 🤖 Prompt for AI Agents |
||
| opts := []goleak.Option{ | ||
| goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), | ||
| goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"), | ||
|
|
@@ -71,3 +84,22 @@ func GetPredicatePushdownSuiteData() testdata.TestData { | |
| func GetPredicateSimplificationSuiteData() testdata.TestData { | ||
| return testDataMap["predicate_simplification"] | ||
| } | ||
| <<<<<<< HEAD | ||
| ======= | ||
|
|
||
| func GetOuterToSemiJoinSuiteData() testdata.TestData { | ||
| return testDataMap["outer_to_semi_join_suite"] | ||
| } | ||
|
|
||
| func GetCorrelateSuiteData() testdata.TestData { | ||
| return testDataMap["correlate_suite"] | ||
| } | ||
|
|
||
| func GetCDCJoinReorderSuiteData() testdata.TestData { | ||
| return testDataMap["cdc_join_reorder_suite"] | ||
| } | ||
|
|
||
| func GetOrderAwareJoinReorderSuiteData() testdata.TestData { | ||
| return testDataMap["order_aware_join_reorder_suite"] | ||
| } | ||
| >>>>>>> 7357a2e2f90 (planner: correlate subquery rule (#66206)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,250 @@ | ||
| // Copyright 2026 PingCAP, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package rule | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "strings" | ||
| "testing" | ||
|
|
||
| "github.com/pingcap/tidb/pkg/testkit" | ||
| "github.com/pingcap/tidb/pkg/testkit/testdata" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // TestCorrelateNullSemantics verifies that CorrelateSolver does not break | ||
| // 3-valued NULL semantics for scalar IN (LeftOuterSemiJoin). | ||
| func TestCorrelateNullSemantics(t *testing.T) { | ||
| store := testkit.CreateMockStore(t) | ||
| tk := testkit.NewTestKit(t, store) | ||
| tk.MustExec("use test") | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
|
|
||
| // Case 1: non-null outer, null inner → must return NULL (not 0). | ||
| tk.MustExec("drop table if exists tn, sn") | ||
| tk.MustExec("create table tn(a int)") | ||
| tk.MustExec("create table sn(a int, key(a))") | ||
| tk.MustExec("insert into tn values (1)") | ||
| tk.MustExec("insert into sn values (null)") | ||
| tk.MustQuery("select tn.a in (select sn.a from sn) as r from tn").Check(testkit.Rows("<nil>")) | ||
|
|
||
| // Case 2: null outer, non-null inner → must return NULL (not 0). | ||
| tk.MustExec("truncate table tn") | ||
| tk.MustExec("truncate table sn") | ||
| tk.MustExec("insert into tn values (null)") | ||
| tk.MustExec("insert into sn values (1)") | ||
| tk.MustQuery("select tn.a in (select sn.a from sn) as r from tn").Check(testkit.Rows("<nil>")) | ||
|
|
||
| // Case 3: both columns NOT NULL → correlate is safe; verify correct results. | ||
| tk.MustExec("drop table if exists tnn, snn") | ||
| tk.MustExec("create table tnn(a int not null)") | ||
| tk.MustExec("create table snn(a int not null, key(a))") | ||
| tk.MustExec("insert into tnn values (1), (2), (3)") | ||
| tk.MustExec("insert into snn values (1), (2)") | ||
| tk.MustQuery("select tnn.a in (select snn.a from snn) as r from tnn order by tnn.a").Check(testkit.Rows("1", "1", "0")) | ||
| } | ||
|
|
||
| // TestCorrelateAlternativeChoosesApply verifies that the correlate alternative | ||
| // round produces an Apply plan that wins the cost comparison for a non-correlated | ||
| // IN subquery when an outer WHERE predicate reduces the estimated row count. | ||
| // Without alternative plans, the InnerJoin+Agg rewrite produces IndexJoin+StreamAgg. | ||
| // With alternative plans, the correlate round produces Apply+Limit which is cheaper | ||
| // (avoids the StreamAgg overhead and uses Limit 1 for early exit on the inner side). | ||
| func TestCorrelateAlternativeChoosesApply(t *testing.T) { | ||
| store := testkit.CreateMockStore(t) | ||
| tk := testkit.NewTestKit(t, store) | ||
| tk.MustExec("use test") | ||
|
|
||
| tk.MustExec("drop table if exists t1, t2") | ||
| tk.MustExec("create table t1 (a int not null, b int, key(a))") | ||
| tk.MustExec("create table t2 (a int not null, b int, key(a))") | ||
| tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)") | ||
| tk.MustExec("insert into t2 values (1,10),(2,20)") | ||
|
|
||
| sql := "select * from t1 where b = 1 and a in (select a from t2)" | ||
|
|
||
| // Without alternative plans: standard InnerJoin+Agg path produces IndexJoin. | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF") | ||
| rows := tk.MustQuery("explain format = 'brief' " + sql).Rows() | ||
| require.True(t, explainContains(rows, "IndexJoin"), | ||
| "without alternative plans, expected IndexJoin in plan:\n%s", joinExplainRows(rows)) | ||
|
|
||
| // With alternative plans: correlate round produces Apply (cheaper than IndexJoin+StreamAgg). | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
| rows = tk.MustQuery("explain format = 'brief' " + sql).Rows() | ||
| require.True(t, explainContains(rows, "Apply"), | ||
| "with alternative plans, expected Apply in plan:\n%s", joinExplainRows(rows)) | ||
|
|
||
| // Verify correct results in both modes. | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF") | ||
| tk.MustQuery(sql).Check(testkit.Rows("1 1")) | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
| tk.MustQuery(sql).Check(testkit.Rows("1 1")) | ||
| } | ||
|
|
||
| func TestCorrelate(tt *testing.T) { | ||
| testkit.RunTestUnderCascades(tt, func(t *testing.T, tk *testkit.TestKit, cascades, caller string) { | ||
| tk.MustExec("use test") | ||
| tk.MustExec("drop table if exists t1, t2, t3") | ||
| tk.MustExec("create table t1 (a int, b int, key(a))") | ||
| tk.MustExec("create table t2 (a int, b int, key(a))") | ||
| tk.MustExec("create table t3 (a int, b int, key(a))") | ||
| tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)") | ||
| tk.MustExec("insert into t2 values (1,10),(2,20)") | ||
| tk.MustExec("insert into t3 values (10,1),(20,2)") | ||
|
|
||
| // Enable the correlate rule. | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
|
|
||
| var input []string | ||
| var output []struct { | ||
| SQL string | ||
| Plan []string | ||
| Result []string | ||
| } | ||
| suite := GetCorrelateSuiteData() | ||
| suite.LoadTestCases(t, &input, &output, cascades, caller) | ||
| for i, sql := range input { | ||
| testdata.OnRecord(func() { | ||
| output[i].SQL = sql | ||
| output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + sql).Rows()) | ||
| output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(sql).Rows()) | ||
| }) | ||
| tk.MustQuery("explain format = 'brief' " + sql).Check(testkit.Rows(output[i].Plan...)) | ||
| tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...)) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // explainContains scans all explain rows for a substring in the operator column. | ||
| func explainContains(rows [][]any, substr string) bool { | ||
| for _, row := range rows { | ||
| if strings.Contains(row[0].(string), substr) { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // joinExplainRows formats explain rows into a single string for debug output. | ||
| func joinExplainRows(rows [][]any) string { | ||
| var sb strings.Builder | ||
| for _, row := range rows { | ||
| sb.WriteString(row[0].(string)) | ||
| sb.WriteByte('\n') | ||
| } | ||
| return sb.String() | ||
| } | ||
|
|
||
| // TestCorrelateParallelApply verifies that when the correlate alternative round | ||
| // produces an Apply plan and tidb_enable_parallel_apply is ON, the Apply is | ||
| // executed with parallel concurrency. This tests the interaction between the | ||
| // correlate optimization (converting decorrelated semi-join back to Apply) and | ||
| // the parallel apply executor. | ||
| func TestCorrelateParallelApply(t *testing.T) { | ||
| store := testkit.CreateMockStore(t) | ||
| tk := testkit.NewTestKit(t, store) | ||
| tk.MustExec("use test") | ||
|
|
||
| tk.MustExec("drop table if exists t1, t2") | ||
| tk.MustExec("create table t1 (a int not null, b int, key(a))") | ||
| tk.MustExec("create table t2 (a int not null, b int, key(a))") | ||
| tk.MustExec("insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5)") | ||
| tk.MustExec("insert into t2 values (1,10),(2,20),(3,30)") | ||
|
|
||
| sql := "select * from t1 where b = 1 and a in (select a from t2)" | ||
|
|
||
| // Enable correlate alternative + parallel apply. | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
| tk.MustExec("set tidb_enable_parallel_apply = ON") | ||
| tk.MustExec("set tidb_executor_concurrency = 5") | ||
|
|
||
| // Verify the plan contains Apply (correlate alternative won). | ||
| rows := tk.MustQuery("explain format = 'brief' " + sql).Rows() | ||
| require.True(t, explainContains(rows, "Apply"), | ||
| "with correlate alternative + parallel apply, expected Apply in plan:\n%s", joinExplainRows(rows)) | ||
|
|
||
| // Verify EXPLAIN ANALYZE reports Concurrency > 1 for the Apply. | ||
| analyzeRows := tk.MustQuery("explain analyze " + sql).Rows() | ||
| foundConcurrency := false | ||
| for _, row := range analyzeRows { | ||
| line := fmt.Sprintf("%v", row) | ||
| if strings.Contains(line, "Apply") && strings.Contains(line, "Concurrency:") { | ||
| idx := strings.Index(line, "Concurrency:") | ||
| if idx >= 0 { | ||
| rest := line[idx+len("Concurrency:"):] | ||
| var n int | ||
| if _, err := fmt.Sscanf(rest, "%d", &n); err == nil && n > 1 { | ||
| foundConcurrency = true | ||
| } | ||
| } | ||
| break | ||
| } | ||
| } | ||
| require.True(t, foundConcurrency, | ||
| "EXPLAIN ANALYZE must report Concurrency > 1 for Apply when parallel_apply is on") | ||
|
Comment on lines
+167
to
+197
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use multiple qualifying outer rows before asserting This fixture leaves only one outer row after 🤖 Prompt for AI AgentsThere was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
My original concern still stands: in To make the test deterministic, consider inserting multiple rows into // Replace current t1 inserts with rows that give multiple qualifying outer rows:
tk.MustExec("insert into t1 values (1,1),(2,1),(3,1),(4,4),(5,5)")
// Now b=1 matches (1,1),(2,1),(3,1) and t2 has (1,10),(2,20),(3,30),
// so 3 outer rows feed into the Apply, making Concurrency>1 meaningful.Or alternatively, relax the query predicate so more than one outer row qualifies before the subquery evaluation. 🧠 Learnings used |
||
|
|
||
| // Verify correctness: parallel + correlate must match serial + no correlate. | ||
| tk.MustExec("set tidb_enable_parallel_apply = OFF") | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF") | ||
| serialRows := tk.MustQuery(sql).Rows() | ||
|
|
||
| tk.MustExec("set tidb_enable_parallel_apply = ON") | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
| parallelRows := tk.MustQuery(sql).Rows() | ||
|
|
||
| require.Equal(t, serialRows, parallelRows, | ||
| "correlate alternative + parallel apply must produce the same result as standard path") | ||
| } | ||
|
|
||
| // TestCorrelateWithCostFactors verifies that when hash/merge join cost factors | ||
| // are increased, the correlate alternative round wins and produces Apply-based | ||
| // plans with correlated index access for cases that normally choose HashJoin. | ||
| func TestCorrelateWithCostFactors(tt *testing.T) { | ||
| testkit.RunTestUnderCascades(tt, func(t *testing.T, tk *testkit.TestKit, cascades, caller string) { | ||
| tk.MustExec("use test") | ||
| tk.MustExec("drop table if exists t1, t2, t3") | ||
| tk.MustExec("create table t1 (a int, b int, key(a))") | ||
| tk.MustExec("create table t2 (a int, b int, key(a))") | ||
| tk.MustExec("create table t3 (a int, b int, key(a))") | ||
| tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)") | ||
| tk.MustExec("insert into t2 values (1,10),(2,20)") | ||
| tk.MustExec("insert into t3 values (10,1),(20,2)") | ||
|
|
||
| // Enable the correlate rule and penalize hash/merge joins so the | ||
| // correlate alternative (Apply with index lookup) wins the cost comparison. | ||
| tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON") | ||
| tk.MustExec("set tidb_opt_hash_join_cost_factor = 1000") | ||
| tk.MustExec("set tidb_opt_merge_join_cost_factor = 1000") | ||
|
|
||
| var input []string | ||
| var output []struct { | ||
| SQL string | ||
| Plan []string | ||
| Result []string | ||
| } | ||
| suite := GetCorrelateSuiteData() | ||
| suite.LoadTestCases(t, &input, &output, cascades, caller) | ||
| for i, sql := range input { | ||
| testdata.OnRecord(func() { | ||
| output[i].SQL = sql | ||
| output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + sql).Rows()) | ||
| output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(sql).Rows()) | ||
| }) | ||
| tk.MustQuery("explain format = 'brief' " + sql).Check(testkit.Rows(output[i].Plan...)) | ||
| tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...)) | ||
| } | ||
| }) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| [ | ||
| { | ||
| "name": "TestCorrelate", | ||
| "cases": [ | ||
| "select * from t1 where exists (select 1 from t2 where t2.a = t1.a)", | ||
| "select * from t1 where not exists (select 1 from t2 where t2.a = t1.a)", | ||
| "select * from t1 where a in (select a from t2)", | ||
| "select * from t1 where exists (select 1 from t2)", | ||
| "select * from t1 where a not in (select a from t2)", | ||
| "select * from t1 where exists (select 1 from t2 where t2.a > t1.a)", | ||
| "select * from t1 where exists (select 1 from t2 where t2.a = t1.a and t2.b > t1.b)", | ||
| "select * from t1 where exists (select /*+ NO_DECORRELATE() */ 1 from t2 where t2.a = t1.a)", | ||
| "select * from t1 where a in (select t2.a from t2 inner join t3 on t3.a = t2.b where t3.b > 0)", | ||
| "select * from t1 where a in (select a from t2) order by a limit 10", | ||
| "select * from t1 where a in (select a from t2 where b > 1)", | ||
| "select * from t1 where a in (select a from t2 group by a)", | ||
| "select * from t1 where a in (select a from t2 where b > 1 group by a)", | ||
| "select * from t1 where a in (select a from t2 limit 10)", | ||
| "select * from t1 where a in (select a from t2 order by a limit 10)", | ||
| "select * from t1 where b = 1 and a in (select a from t2)", | ||
| "select * from t1 where b = 1 and exists (select 1 from t2 where t2.a = t1.a) limit 1", | ||
| "select * from t1 where b = 1 and a not in (select a from t2) limit 1", | ||
| "select * from t1 where b = 1 and a in (select a from t2 where t2.b > 0) limit 1" | ||
| ] | ||
| }, | ||
| { | ||
| "name": "TestCorrelateWithCostFactors", | ||
| "cases": [ | ||
| "select * from t1 where exists (select 1 from t2 where t2.a = t1.a)", | ||
| "select * from t1 where not exists (select 1 from t2 where t2.a = t1.a)", | ||
| "select * from t1 where a in (select a from t2)", | ||
| "select * from t1 where exists (select 1 from t2 where t2.a > t1.a)", | ||
| "select * from t1 where exists (select 1 from t2 where t2.a = t1.a and t2.b > t1.b)", | ||
| "select * from t1 where a in (select a from t2) order by a limit 10", | ||
| "select * from t1 where a in (select a from t2 where b > 1)", | ||
| "select * from t1 where a in (select a from t2 order by a limit 10)" | ||
| ] | ||
| } | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolve the leftover merge conflict in the
srcslist.The
<<<<<<</=======/>>>>>>>markers are still present here, so this Bazel file will not parse and the cherry-pick cannot build.🤖 Prompt for AI Agents