Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
da93b6f
planner: correlate subquery rule (wip)
terry1purcell Feb 11, 2026
171333a
review comments1
terry1purcell Feb 11, 2026
fca2997
add testcases1
terry1purcell Feb 11, 2026
1d7e38a
add testcases2
terry1purcell Feb 11, 2026
0f6c877
Merge branch 'master' into correlate
terry1purcell Feb 19, 2026
62df8f8
refactor for order
terry1purcell Feb 19, 2026
5ba3cca
refactor for cost based evaluation
terry1purcell Feb 19, 2026
a0631a5
refactor for 2nd customer example
terry1purcell Feb 20, 2026
2a39db6
increase test coverage
terry1purcell Feb 21, 2026
c1a3d75
new pantheon review comments
terry1purcell Feb 22, 2026
88efc6a
update bazel
terry1purcell Feb 22, 2026
0511568
hint fix
terry1purcell Feb 24, 2026
a807f60
pushdown hint fix
terry1purcell Feb 24, 2026
7fb0ecc
Merge branch 'master' into correlate
terry1purcell Feb 25, 2026
14ff48e
Merge branch 'master' into correlate
terry1purcell Apr 6, 2026
155437a
refactor after alternative plan PR implemented
terry1purcell Apr 6, 2026
261c6e8
build error
terry1purcell Apr 6, 2026
b4b1be2
review comments after refactor
terry1purcell Apr 7, 2026
88bfc9b
claude review
terry1purcell Apr 7, 2026
9e14702
add parallel apply
terry1purcell Apr 7, 2026
ac97f9e
import reorder
terry1purcell Apr 7, 2026
e53692a
review comments
terry1purcell Apr 10, 2026
7d49f71
copilot review comments
terry1purcell Apr 11, 2026
4c32338
move clones per review
terry1purcell Apr 17, 2026
6121e3a
Merge branch 'pingcap:master' into correlate
terry1purcell Apr 17, 2026
dd0f84f
review comments2
terry1purcell Apr 21, 2026
448665b
review comments3
terry1purcell Apr 21, 2026
34bbc9e
Merge branch 'master' into correlate
terry1purcell Apr 21, 2026
be086e8
update bazel
terry1purcell Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/bindinfo/binding_auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package bindinfo_test

import (
"fmt"
"slices"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testdata"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -230,6 +232,30 @@ func TestRelevantOptVarsAndFixes(t *testing.T) {
}
}

func TestRelevantOptVarsCorrelateSubquery(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`create table t1 (a int, b int, key(a))`)
tk.MustExec(`create table t2 (a int, b int, key(a))`)

p := parser.New()
sql := "select * from t1 where a in (select a from t2)"

// The alternative logical plans variable is recorded as relevant because the
// code path where it affects plan choice (correlate-to-Apply) was reached.
for _, enabled := range []string{"OFF", "ON"} {
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = " + enabled)
p.Reset()
stmt, err := p.ParseOneStmt(sql, "", "")
require.NoError(t, err)
vars, _, err := bindinfo.RecordRelevantOptVarsAndFixes(tk.Session(), stmt)
require.NoError(t, err)
require.True(t, slices.Contains(vars, vardef.TiDBOptEnableAlternativeLogicalPlans),
"enabled=%s: expected %s in recorded vars %v", enabled, vardef.TiDBOptEnableAlternativeLogicalPlans, vars)
}
}

func TestExplainExploreAnalyze(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
6 changes: 5 additions & 1 deletion pkg/bindinfo/binding_plan_generation.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ func genPlanUnderState(sctx sessionctx.Context, stmt ast.StmtNode, state *state)
sctx.GetSessionVars().EnableSemiJoinRewrite = state.varValues[i].(bool)
case vardef.TiDBOptSelectivityFactor:
sctx.GetSessionVars().SelectivityFactor = state.varValues[i].(float64)
case vardef.TiDBOptEnableAlternativeLogicalPlans:
sctx.GetSessionVars().EnableAlternativeLogicalPlans = state.varValues[i].(bool)
default:
return nil, fmt.Errorf("unsupported variable %s in plan generation", varName)
}
Expand Down Expand Up @@ -547,7 +549,7 @@ func adjustVar(varName string, varVal any) (newVarVal any, err error) {
}
// increase 0.1 each step
return v + 0.1, nil
case vardef.TiDBOptPreferRangeScan, vardef.TiDBOptEnableNoDecorrelateInSelect, vardef.TiDBOptAlwaysKeepJoinKey, vardef.TiDBOptEnableSemiJoinRewrite: // flip the switch
case vardef.TiDBOptPreferRangeScan, vardef.TiDBOptEnableNoDecorrelateInSelect, vardef.TiDBOptAlwaysKeepJoinKey, vardef.TiDBOptEnableSemiJoinRewrite, vardef.TiDBOptEnableAlternativeLogicalPlans: // flip the switch
return !varVal.(bool), nil
}
return nil, fmt.Errorf("unsupported variable %s in plan generation", varName)
Expand Down Expand Up @@ -640,6 +642,8 @@ func getStartState(vars []string, fixes []uint64, indexHintCount int) (*state, e
s.varValues = append(s.varValues, vardef.DefOptSelectivityFactor)
case vardef.TiDBOptCartesianJoinOrderThreshold:
s.varValues = append(s.varValues, vardef.DefOptCartesianJoinOrderThreshold)
case vardef.TiDBOptEnableAlternativeLogicalPlans:
s.varValues = append(s.varValues, vardef.DefOptEnableAlternativeLogicalPlans)
default:
return nil, fmt.Errorf("unsupported variable %s in plan generation", varName)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ go_library(
"//pkg/util/dbterror/plannererrors",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/topsql",
"//pkg/util/tracing",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"rule_aggregation_elimination.go",
"rule_aggregation_push_down.go",
"rule_aggregation_skew_rewrite.go",
"rule_correlate.go",
"rule_decorrelate.go",
"rule_derive_topn_from_window.go",
"rule_eliminate_empty_selection.go",
Expand Down
3 changes: 2 additions & 1 deletion pkg/planner/core/casetest/rule/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"main_test.go",
"rule_cdc_join_reorder_test.go",
"rule_common_handle_ordering_test.go",
"rule_correlate_test.go",
"rule_derive_topn_from_window_test.go",
"rule_eliminate_empty_selection_test.go",
"rule_eliminate_projection_test.go",
Expand All @@ -20,7 +21,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 30,
shard_count = 35,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/casetest/rule/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestMain(m *testing.M) {
testDataMap.LoadTestSuiteData("testdata", "predicate_pushdown_suite", true)
testDataMap.LoadTestSuiteData("testdata", "predicate_simplification", true)
testDataMap.LoadTestSuiteData("testdata", "outer_to_semi_join_suite", true)
testDataMap.LoadTestSuiteData("testdata", "correlate_suite", true)
testDataMap.LoadTestSuiteData("testdata", "cdc_join_reorder_suite", true)
testDataMap.LoadTestSuiteData("testdata", "order_aware_join_reorder_suite", true)

Expand Down Expand Up @@ -80,6 +81,10 @@ func GetOuterToSemiJoinSuiteData() testdata.TestData {
return testDataMap["outer_to_semi_join_suite"]
}

func GetCorrelateSuiteData() testdata.TestData {
return testDataMap["correlate_suite"]
}

func GetCDCJoinReorderSuiteData() testdata.TestData {
return testDataMap["cdc_join_reorder_suite"]
}
Expand Down
250 changes: 250 additions & 0 deletions pkg/planner/core/casetest/rule/rule_correlate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rule

import (
"fmt"
"strings"
"testing"

"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testdata"
"github.com/stretchr/testify/require"
)

// TestCorrelateNullSemantics verifies that CorrelateSolver does not break
// 3-valued NULL semantics for scalar IN (LeftOuterSemiJoin).
func TestCorrelateNullSemantics(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")

// Case 1: non-null outer, null inner → must return NULL (not 0).
tk.MustExec("drop table if exists tn, sn")
tk.MustExec("create table tn(a int)")
tk.MustExec("create table sn(a int, key(a))")
tk.MustExec("insert into tn values (1)")
tk.MustExec("insert into sn values (null)")
tk.MustQuery("select tn.a in (select sn.a from sn) as r from tn").Check(testkit.Rows("<nil>"))

// Case 2: null outer, non-null inner → must return NULL (not 0).
tk.MustExec("truncate table tn")
tk.MustExec("truncate table sn")
tk.MustExec("insert into tn values (null)")
tk.MustExec("insert into sn values (1)")
tk.MustQuery("select tn.a in (select sn.a from sn) as r from tn").Check(testkit.Rows("<nil>"))

// Case 3: both columns NOT NULL → correlate is safe; verify correct results.
tk.MustExec("drop table if exists tnn, snn")
tk.MustExec("create table tnn(a int not null)")
tk.MustExec("create table snn(a int not null, key(a))")
tk.MustExec("insert into tnn values (1), (2), (3)")
tk.MustExec("insert into snn values (1), (2)")
tk.MustQuery("select tnn.a in (select snn.a from snn) as r from tnn order by tnn.a").Check(testkit.Rows("1", "1", "0"))
}

// TestCorrelateAlternativeChoosesApply verifies that the correlate alternative
// round produces an Apply plan that wins the cost comparison for a non-correlated
// IN subquery when an outer WHERE predicate reduces the estimated row count.
// Without alternative plans, the InnerJoin+Agg rewrite produces IndexJoin+StreamAgg.
// With alternative plans, the correlate round produces Apply+Limit which is cheaper
// (avoids the StreamAgg overhead and uses Limit 1 for early exit on the inner side).
func TestCorrelateAlternativeChoosesApply(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (a int not null, b int, key(a))")
tk.MustExec("create table t2 (a int not null, b int, key(a))")
tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)")
tk.MustExec("insert into t2 values (1,10),(2,20)")

sql := "select * from t1 where b = 1 and a in (select a from t2)"

// Without alternative plans: standard InnerJoin+Agg path produces IndexJoin.
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF")
rows := tk.MustQuery("explain format = 'brief' " + sql).Rows()
require.True(t, explainContains(rows, "IndexJoin"),
"without alternative plans, expected IndexJoin in plan:\n%s", joinExplainRows(rows))

// With alternative plans: correlate round produces Apply (cheaper than IndexJoin+StreamAgg).
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")
rows = tk.MustQuery("explain format = 'brief' " + sql).Rows()
require.True(t, explainContains(rows, "Apply"),
"with alternative plans, expected Apply in plan:\n%s", joinExplainRows(rows))

// Verify correct results in both modes.
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF")
tk.MustQuery(sql).Check(testkit.Rows("1 1"))
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")
tk.MustQuery(sql).Check(testkit.Rows("1 1"))
}

func TestCorrelate(tt *testing.T) {
testkit.RunTestUnderCascades(tt, func(t *testing.T, tk *testkit.TestKit, cascades, caller string) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1 (a int, b int, key(a))")
tk.MustExec("create table t2 (a int, b int, key(a))")
tk.MustExec("create table t3 (a int, b int, key(a))")
tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)")
tk.MustExec("insert into t2 values (1,10),(2,20)")
tk.MustExec("insert into t3 values (10,1),(20,2)")

// Enable the correlate rule.
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")

var input []string
var output []struct {
SQL string
Plan []string
Result []string
}
suite := GetCorrelateSuiteData()
suite.LoadTestCases(t, &input, &output, cascades, caller)
for i, sql := range input {
testdata.OnRecord(func() {
output[i].SQL = sql
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + sql).Rows())
output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(sql).Rows())
})
tk.MustQuery("explain format = 'brief' " + sql).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
})
}

// explainContains scans all explain rows for a substring in the operator column.
func explainContains(rows [][]any, substr string) bool {
for _, row := range rows {
if strings.Contains(row[0].(string), substr) {
return true
}
}
return false
}

// joinExplainRows formats explain rows into a single string for debug output.
func joinExplainRows(rows [][]any) string {
var sb strings.Builder
for _, row := range rows {
sb.WriteString(row[0].(string))
sb.WriteByte('\n')
}
return sb.String()
}

// TestCorrelateParallelApply verifies that when the correlate alternative round
// produces an Apply plan and tidb_enable_parallel_apply is ON, the Apply is
// executed with parallel concurrency. This tests the interaction between the
// correlate optimization (converting decorrelated semi-join back to Apply) and
// the parallel apply executor.
func TestCorrelateParallelApply(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (a int not null, b int, key(a))")
tk.MustExec("create table t2 (a int not null, b int, key(a))")
tk.MustExec("insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5)")
tk.MustExec("insert into t2 values (1,10),(2,20),(3,30)")

sql := "select * from t1 where b = 1 and a in (select a from t2)"

// Enable correlate alternative + parallel apply.
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")
tk.MustExec("set tidb_enable_parallel_apply = ON")
tk.MustExec("set tidb_executor_concurrency = 5")

// Verify the plan contains Apply (correlate alternative won).
rows := tk.MustQuery("explain format = 'brief' " + sql).Rows()
require.True(t, explainContains(rows, "Apply"),
"with correlate alternative + parallel apply, expected Apply in plan:\n%s", joinExplainRows(rows))

// Verify EXPLAIN ANALYZE reports Concurrency > 1 for the Apply.
analyzeRows := tk.MustQuery("explain analyze " + sql).Rows()
foundConcurrency := false
for _, row := range analyzeRows {
line := fmt.Sprintf("%v", row)
if strings.Contains(line, "Apply") && strings.Contains(line, "Concurrency:") {
idx := strings.Index(line, "Concurrency:")
if idx >= 0 {
rest := line[idx+len("Concurrency:"):]
var n int
if _, err := fmt.Sscanf(rest, "%d", &n); err == nil && n > 1 {
foundConcurrency = true
}
}
break
}
}
require.True(t, foundConcurrency,
"EXPLAIN ANALYZE must report Concurrency > 1 for Apply when parallel_apply is on")

// Verify correctness: parallel + correlate must match serial + no correlate.
tk.MustExec("set tidb_enable_parallel_apply = OFF")
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = OFF")
serialRows := tk.MustQuery(sql).Rows()

tk.MustExec("set tidb_enable_parallel_apply = ON")
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")
parallelRows := tk.MustQuery(sql).Rows()

require.Equal(t, serialRows, parallelRows,
"correlate alternative + parallel apply must produce the same result as standard path")
}

// TestCorrelateWithCostFactors verifies that when hash/merge join cost factors
// are increased, the correlate alternative round wins and produces Apply-based
// plans with correlated index access for cases that normally choose HashJoin.
func TestCorrelateWithCostFactors(tt *testing.T) {
testkit.RunTestUnderCascades(tt, func(t *testing.T, tk *testkit.TestKit, cascades, caller string) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1 (a int, b int, key(a))")
tk.MustExec("create table t2 (a int, b int, key(a))")
tk.MustExec("create table t3 (a int, b int, key(a))")
tk.MustExec("insert into t1 values (1,1),(2,2),(3,3)")
tk.MustExec("insert into t2 values (1,10),(2,20)")
tk.MustExec("insert into t3 values (10,1),(20,2)")

// Enable the correlate rule and penalize hash/merge joins so the
// correlate alternative (Apply with index lookup) wins the cost comparison.
tk.MustExec("set tidb_opt_enable_alternative_logical_plans = ON")
tk.MustExec("set tidb_opt_hash_join_cost_factor = 1000")
tk.MustExec("set tidb_opt_merge_join_cost_factor = 1000")

var input []string
var output []struct {
SQL string
Plan []string
Result []string
}
suite := GetCorrelateSuiteData()
suite.LoadTestCases(t, &input, &output, cascades, caller)
for i, sql := range input {
testdata.OnRecord(func() {
output[i].SQL = sql
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + sql).Rows())
output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(sql).Rows())
})
tk.MustQuery("explain format = 'brief' " + sql).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
})
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Loading
Loading