Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/planner/core",
"//pkg/planner/core/base",
"//pkg/planner/core/resolve",
"//pkg/planner/core/rule",
"//pkg/planner/indexadvisor",
"//pkg/planner/planctx",
"//pkg/planner/property",
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/core/casetest/correlated/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_test(
"//pkg/testkit/testdata",
"//pkg/testkit/testmain",
"//pkg/testkit/testsetup",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
],
)
75 changes: 75 additions & 0 deletions pkg/planner/core/casetest/correlated/correlated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package correlated

import (
"fmt"
"strings"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testdata"
"github.com/stretchr/testify/require"
)

func TestCorrelatedSubquery(t *testing.T) {
Expand Down Expand Up @@ -98,6 +102,68 @@ func TestNaturalJoinWithCorrelatedSubquery(tt *testing.T) {
tk.MustQuery("explain format = 'plan_tree' " + sql).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}

t.Run("AlternativeLogicalPlansChooseApply", func(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists alt_pick_t1, alt_pick_t2, alt_pick_t3")
tk.MustExec("create table alt_pick_t1(a int primary key)")
tk.MustExec("create table alt_pick_t2(a int, b int, key idx_a(a))")
tk.MustExec("create table alt_pick_t3(a int, c int, key idx_a(a))")
tk.MustExec("insert into alt_pick_t1 values (1), (2)")

vals2 := make([]string, 0, 200)
vals3 := make([]string, 0, 200)
for i := 0; i < 200; i++ {
vals2 = append(vals2, fmt.Sprintf("(%d, %d)", i%100, i))
vals3 = append(vals3, fmt.Sprintf("(%d, %d)", i%100, i))
}
tk.MustExec("insert into alt_pick_t2 values " + strings.Join(vals2, ","))
tk.MustExec("insert into alt_pick_t3 values " + strings.Join(vals3, ","))
tk.MustExec("analyze table alt_pick_t1, alt_pick_t2, alt_pick_t3")

tk.MustExec("set @@tidb_opt_enable_alternative_logical_plans=off")
sql := "select alt_pick_t1.a, (select count(*) from alt_pick_t2 join alt_pick_t3 on alt_pick_t2.a = alt_pick_t3.a where alt_pick_t2.a = alt_pick_t1.a) as cnt from alt_pick_t1 order by alt_pick_t1.a"
explainSQL := "explain format = 'brief' " + sql

offPlan := testdata.ConvertRowsToStrings(tk.MustQuery(explainSQL).Rows())
tk.MustQuery(sql).Check(testkit.Rows("1 4", "2 4"))
require.False(t, planContainsText(offPlan, "Apply"), strings.Join(offPlan, "\n"))

tk.MustExec("set @@tidb_opt_enable_alternative_logical_plans=on")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/failIfAlternativeLogicalPlanRoundTriggered", fmt.Sprintf("return(%q)", sql)))
err := tk.ExecToErr(sql)
stmtCtx := tk.Session().GetSessionVars().StmtCtx
require.True(t, stmtCtx.AlternativeLogicalPlanDecorrelatedApply)
require.False(t, stmtCtx.AlternativeLogicalPlanSameOrderIndexJoin)
require.ErrorContains(t, err, "unexpected alternative logical plan round")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/failIfAlternativeLogicalPlanRoundTriggered"))

onPlan := testdata.ConvertRowsToStrings(tk.MustQuery(explainSQL).Rows())
tk.MustQuery(sql).Check(testkit.Rows("1 4", "2 4"))
require.True(t, planContainsText(onPlan, "Apply"), strings.Join(onPlan, "\n"))
})

t.Run("AlternativeLogicalPlansSkipSecondRoundWhenIndexJoinExists", func(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("set @@tidb_opt_enable_alternative_logical_plans=on")
tk.MustExec("drop table if exists alt_skip_t1, alt_skip_t2")
tk.MustExec("create table alt_skip_t1(a int primary key)")
tk.MustExec("create table alt_skip_t2(a int, b int, key idx_a(a))")
tk.MustExec("insert into alt_skip_t1 values (1), (2), (3)")
tk.MustExec("insert into alt_skip_t2 values (1, 1), (1, 2), (2, 3), (3, 4)")
tk.MustExec("analyze table alt_skip_t1, alt_skip_t2")

sql := "select alt_skip_t1.a from alt_skip_t1 where exists (select 1 from alt_skip_t2 where alt_skip_t2.a = alt_skip_t1.a and alt_skip_t2.b > 0) order by alt_skip_t1.a"
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/failIfAlternativeLogicalPlanRoundTriggered", fmt.Sprintf("return(%q)", sql)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/failIfAlternativeLogicalPlanRoundTriggered"))
}()

require.NoError(t, tk.QueryToErr(sql))
stmtCtx := tk.Session().GetSessionVars().StmtCtx
require.True(t, stmtCtx.AlternativeLogicalPlanDecorrelatedApply)
require.True(t, stmtCtx.AlternativeLogicalPlanSameOrderIndexJoin)
})
})
}

Expand All @@ -114,3 +180,12 @@ func TestWrongDecorrelate(t *testing.T) {
" 30025.20000000000000000000 60121022342",
"X 6.23000000000000000000 60021022342"))
}

func planContainsText(plan []string, needle string) bool {
for _, row := range plan {
if strings.Contains(row, needle) {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func constructIndexJoinStatic(
// for static enumeration here, we just pass down the original equal condition for condition adjustment rather
// depend on the original logical join node.
EqualConditions: p.EqualConditions,
// Only count candidates that keep the original Apply outer/inner order.
FromDecorrelatedApply: p.FromDecorrelatedApply && outerIdx == 0,
}.Init(p.SCtx(), p.StatsInfo().ScaleByExpectCnt(p.SCtx().GetSessionVars(), prop.ExpectedCnt), p.QueryBlockOffset(), chReqProps...)
join.SetSchema(p.Schema())
return []base.PhysicalPlan{join}
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/operator/logicalop/logical_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ type LogicalJoin struct {

// allJoinLeaf is used to identify the table where the column is located during constant propagation.
allJoinLeaf []*expression.Schema

// FromDecorrelatedApply marks joins that come from decorrelating an Apply in the
// first logical round. It is only used to decide whether an equivalent same-order
// PhysicalIndexJoin candidate has already been generated.
FromDecorrelatedApply bool
}

// Init initializes LogicalJoin.
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/operator/physicalop/physical_index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type PhysicalIndexJoin struct {
InnerHashKeys []*expression.Column
// EqualConditions stores the equal conditions for logical join's original EqualConditions.
EqualConditions []*expression.ScalarFunction `plan-cache-clone:"shallow"`

// FromDecorrelatedApply is true only when this IndexJoin keeps the original
// Apply outer/inner order after decorrelation.
FromDecorrelatedApply bool
}

// Init initializes PhysicalIndexJoin.
Expand Down Expand Up @@ -99,6 +103,7 @@ func (p *PhysicalIndexJoin) Clone(newCtx base.PlanContext) (base.PhysicalPlan, e
cloned.CompareFilters = p.CompareFilters.cloneForPlanCache()
cloned.OuterHashKeys = util.CloneCols(p.OuterHashKeys)
cloned.InnerHashKeys = util.CloneCols(p.InnerHashKeys)
cloned.FromDecorrelatedApply = p.FromDecorrelatedApply
return cloned, nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func (s *DecorrelateSolver) optimize(ctx context.Context, p base.LogicalPlan, gr
join := &apply.LogicalJoin
join.SetSelf(join)
join.SetTP(plancodec.TypeJoin)
if p.SCtx().GetSessionVars().EnableAlternativeLogicalPlans {
p.SCtx().GetSessionVars().StmtCtx.MarkAlternativeLogicalPlanDecorrelatedApply()
join.FromDecorrelatedApply = true
}
p = join
} else if apply.NoDecorrelate {
goto NoOptimize
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func indexJoinAttach2Task(p *physicalop.PhysicalIndexJoin, tasks ...base.Task) b
outerTask := tasks[1-p.InnerChildIdx].ConvertToRootTask(p.SCtx())
innerTask := tasks[p.InnerChildIdx].ConvertToRootTask(p.SCtx())
completePhysicalIndexJoin(p, innerTask.(*physicalop.RootTask), innerTask.Plan().Schema(), outerTask.Plan().Schema(), true)
if p.FromDecorrelatedApply {
p.SCtx().GetSessionVars().StmtCtx.MarkAlternativeLogicalPlanSameOrderIndexJoin()
}
if p.InnerChildIdx == 1 {
p.SetChildren(outerTask.Plan(), innerTask.Plan())
} else {
Expand Down
58 changes: 49 additions & 9 deletions pkg/planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/resolve"
"github.com/pingcap/tidb/pkg/planner/core/rule"
"github.com/pingcap/tidb/pkg/planner/indexadvisor"
"github.com/pingcap/tidb/pkg/planner/planctx"
"github.com/pingcap/tidb/pkg/planner/property"
Expand Down Expand Up @@ -478,6 +479,7 @@ func buildAndOptimizeLogicalPlanRound(
bestNames *types.NameSlice,
bestCost *float64,
bestLogicalPlanCtx *logicalPlanBuildCtx,
optFlagAdjust func(uint64) uint64,
) (base.Plan, types.NameSlice, bool, error) {
builder := planBuilderPool.Get().(*core.PlanBuilder)
defer planBuilderPool.Put(builder.ResetForReuse())
Expand Down Expand Up @@ -526,7 +528,11 @@ func buildAndOptimizeLogicalPlanRound(
*optimizeStarted = true
*beginOpt = time.Now()
}
finalPlan, cost, err := core.DoOptimize(ctx, sctx, builder.GetOptFlag(), logic)
optFlag := builder.GetOptFlag()
if optFlagAdjust != nil {
optFlag = optFlagAdjust(optFlag)
}
finalPlan, cost, err := core.DoOptimize(ctx, sctx, optFlag, logic)
if err != nil {
return nil, nil, false, err
}
Expand All @@ -545,6 +551,12 @@ func buildAndOptimizeLogicalPlanRound(
// optimizeCnt is a global variable only used for test.
var optimizeCnt int

func shouldTryAlternativeLogicalPlanRound(sessVars *variable.SessionVars) bool {
return sessVars.EnableAlternativeLogicalPlans &&
sessVars.StmtCtx.AlternativeLogicalPlanDecorrelatedApply &&
!sessVars.StmtCtx.AlternativeLogicalPlanSameOrderIndexJoin
}

func optimize(ctx context.Context, sctx planctx.PlanContext, node *resolve.NodeW, is infoschema.InfoSchema) (base.Plan, types.NameSlice, float64, error) {
failpoint.Inject("checkOptimizeCountOne", func(val failpoint.Value) {
// only count the optimization for SQL with specified text
Expand Down Expand Up @@ -577,8 +589,7 @@ func optimize(ctx context.Context, sctx planctx.PlanContext, node *resolve.NodeW

// build multi logical plan from raw AST.
var (
buildRound = 1
needRestoreLogicalPlanCtx = buildRound > 1
needRestoreLogicalPlanCtx = sessVars.EnableAlternativeLogicalPlans
bestCost = math.MaxFloat64
bestPlan base.PhysicalPlan
bestNames types.NameSlice
Expand All @@ -588,13 +599,42 @@ func optimize(ctx context.Context, sctx planctx.PlanContext, node *resolve.NodeW
var initialLogicalPlanCtx logicalPlanBuildCtx
if needRestoreLogicalPlanCtx {
initialLogicalPlanCtx = saveLogicalPlanBuildCtx(sessVars)
sessVars.StmtCtx.ResetAlternativeLogicalPlanSignals()
}
for i := range buildRound {
if needRestoreLogicalPlanCtx && i > 0 {
restoreLogicalPlanBuildCtx(sessVars, initialLogicalPlanCtx)
}

p, names, nonLogical, err := buildAndOptimizeLogicalPlanRound(
p, names, nonLogical, err := buildAndOptimizeLogicalPlanRound(
ctx,
sctx,
node,
is,
hintProcessor,
&checked,
&optimizeStarted,
&beginOpt,
needRestoreLogicalPlanCtx,
&bestPlan,
&bestNames,
&bestCost,
&bestLogicalPlanCtx,
nil,
)
if err != nil {
return nil, nil, 0, err
}
if nonLogical {
// keep compatible with the old.
return p, names, 0, nil
}

if shouldTryAlternativeLogicalPlanRound(sessVars) {
restoreLogicalPlanBuildCtx(sessVars, initialLogicalPlanCtx)
failpoint.Inject("failIfAlternativeLogicalPlanRoundTriggered", func(val failpoint.Value) {
if testSQL, ok := val.(string); ok && testSQL == node.Node.OriginalText() {
failpoint.Return(nil, nil, 0, errors.New("unexpected alternative logical plan round"))
}
})

p, names, nonLogical, err = buildAndOptimizeLogicalPlanRound(
ctx,
sctx,
node,
Expand All @@ -608,12 +648,12 @@ func optimize(ctx context.Context, sctx planctx.PlanContext, node *resolve.NodeW
&bestNames,
&bestCost,
&bestLogicalPlanCtx,
func(flag uint64) uint64 { return flag &^ rule.FlagDecorrelate },
)
if err != nil {
return nil, nil, 0, err
}
if nonLogical {
// keep compatible with the old.
return p, names, 0, nil
}
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,12 @@ type StatementContext struct {
UseDynamicPruneMode bool
// ColRefFromPlan mark the column ref used by assignment in update statement.
ColRefFromUpdatePlan intset.FastIntSet
// AlternativeLogicalPlanDecorrelatedApply indicates whether the current logical
// optimization round decorrelated at least one Apply into Join.
AlternativeLogicalPlanDecorrelatedApply bool
// AlternativeLogicalPlanSameOrderIndexJoin indicates whether the current first
// round already produced a same-order index join candidate for a decorrelated Apply.
AlternativeLogicalPlanSameOrderIndexJoin bool

// IsExplainAnalyzeDML is true if the statement is "explain analyze DML executors", before responding the explain
// results to the client, the transaction should be committed first. See issue #37373 for more details.
Expand Down Expand Up @@ -646,6 +652,25 @@ func (sc *StatementContext) RestoreLogicalPlanBuildState(state LogicalPlanBuildS
sc.RangeFallbackHandler = contextutil.NewRangeFallbackHandler(&sc.PlanCacheTracker, sc)
}

// ResetAlternativeLogicalPlanSignals clears the statement-local signals used by the
// alternative logical plan feature.
func (sc *StatementContext) ResetAlternativeLogicalPlanSignals() {
sc.AlternativeLogicalPlanDecorrelatedApply = false
sc.AlternativeLogicalPlanSameOrderIndexJoin = false
}

// MarkAlternativeLogicalPlanDecorrelatedApply records that at least one Apply has
// been decorrelated into a Join in the current round.
func (sc *StatementContext) MarkAlternativeLogicalPlanDecorrelatedApply() {
sc.AlternativeLogicalPlanDecorrelatedApply = true
}

// MarkAlternativeLogicalPlanSameOrderIndexJoin records that the current first round
// has already produced a same-order index join candidate for a decorrelated Apply.
func (sc *StatementContext) MarkAlternativeLogicalPlanSameOrderIndexJoin() {
sc.AlternativeLogicalPlanSameOrderIndexJoin = true
}

// CtxID returns the context id of the statement
func (sc *StatementContext) CtxID() uint64 {
return sc.ctxID
Expand Down
6 changes: 6 additions & 0 deletions pkg/sessionctx/vardef/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ const (
// TiDBOptEnableNoDecorrelateInSelect is used to control whether to enable the NO_DECORRELATE hint for subqueries in the select list.
TiDBOptEnableNoDecorrelateInSelect = "tidb_opt_enable_no_decorrelate_in_select"

// TiDBOptEnableAlternativeLogicalPlans controls whether the optimizer may build
// an extra non-decorrelate logical alternative when decorrelation does not
// produce an equivalent same-order index join candidate.
TiDBOptEnableAlternativeLogicalPlans = "tidb_opt_enable_alternative_logical_plans"

// TiDBEnableSemiJoinRewrite controls automatic rewrite of semi-join to
// inner-join with aggregation (equivalent to SEMI_JOIN_REWRITE() hint).
TiDBOptEnableSemiJoinRewrite = "tidb_opt_enable_semi_join_rewrite"
Expand Down Expand Up @@ -1456,6 +1461,7 @@ const (
DefOptInSubqToJoinAndAgg = true
DefOptPreferRangeScan = true
DefOptEnableNoDecorrelateInSelect = false
DefOptEnableAlternativeLogicalPlans = false
DefOptEnableSemiJoinRewrite = false
DefBatchInsert = false
DefBatchDelete = false
Expand Down
6 changes: 6 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,11 @@ type SessionVars struct {
// EnableNoDecorrelateInSelect enables the NO_DECORRELATE hint for subqueries in the select list.
EnableNoDecorrelateInSelect bool

// EnableAlternativeLogicalPlans enables building an extra non-decorrelate
// logical alternative when decorrelation does not produce an equivalent
// same-order index join candidate.
EnableAlternativeLogicalPlans bool

// EnableSemiJoinRewrite enables the SEMI_JOIN_REWRITE hint for subqueries in the where clause.
EnableSemiJoinRewrite bool

Expand Down Expand Up @@ -2332,6 +2337,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
CartesianJoinOrderThreshold: vardef.DefOptCartesianJoinOrderThreshold,
EnableOuterJoinReorder: vardef.DefTiDBEnableOuterJoinReorder,
EnableNoDecorrelateInSelect: vardef.DefOptEnableNoDecorrelateInSelect,
EnableAlternativeLogicalPlans: vardef.DefOptEnableAlternativeLogicalPlans,
EnableSemiJoinRewrite: vardef.DefOptEnableSemiJoinRewrite,
RetryLimit: vardef.DefTiDBRetryLimit,
DisableTxnAutoRetry: vardef.DefTiDBDisableTxnAutoRetry,
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/setvar_affect.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var isHintUpdatableVerified = map[string]struct{}{
"mpp_version": {},
"tidb_enable_inl_join_inner_multi_pattern": {},
"tidb_opt_enable_no_decorrelate_in_select": {},
"tidb_opt_enable_alternative_logical_plans": {},
"tidb_opt_enable_late_materialization": {},
"tidb_opt_ordering_index_selectivity_threshold": {},
"tidb_opt_ordering_index_selectivity_ratio": {},
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,10 @@ var defaultSysVars = []*SysVar{
s.EnableNoDecorrelateInSelect = TiDBOptOn(val)
return nil
}},
{Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.TiDBOptEnableAlternativeLogicalPlans, Value: BoolToOnOff(vardef.DefOptEnableAlternativeLogicalPlans), Type: vardef.TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableAlternativeLogicalPlans = TiDBOptOn(val)
return nil
}},
{Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.TiDBEnableStrictDoubleTypeCheck, Value: BoolToOnOff(vardef.DefEnableStrictDoubleTypeCheck), Type: vardef.TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableStrictDoubleTypeCheck = TiDBOptOn(val)
return nil
Expand Down
Loading