Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,35 +495,46 @@ func (b *PlanBuilder) buildResultSetNode(ctx context.Context, node ast.ResultSet
return nil, err
}

// Clone output names before modifying to avoid mutating shared structs
if x.AsName.L != "" {
clonedNames := make([]*types.FieldName, len(p.OutputNames()))
for i, name := range p.OutputNames() {
if name.Hidden {
clonedNames[i] = name
continue
}
// Clone the field name and update table name.
// For derived tables, clear DBName so that error messages (e.g. only_full_group_by)
// show "alias.col" not "db.alias.col". The current-database qualifier needed for
// hint generation (leading()) is set separately on plannerSelectBlockAsName below.
// For base-table aliases (isTableName), inherit DBName for DEFAULT() resolution.
dbName := ast.NewCIStr("")
if isTableName {
dbName = name.DBName
if x.Lateral {
// LATERAL derived tables: clone output names to avoid mutating shared
// structs that may be referenced by the outer scope's schema.
clonedNames := make([]*types.FieldName, len(p.OutputNames()))
for i, name := range p.OutputNames() {
if name.Hidden {
clonedNames[i] = name
continue
}
// Clone the field name and update table name.
// For derived tables, clear DBName so that error messages (e.g. only_full_group_by)
// show "alias.col" not "db.alias.col". The current-database qualifier needed for
// hint generation (leading()) is set separately on plannerSelectBlockAsName below.
// For base-table aliases (isTableName), inherit DBName for DEFAULT() resolution.
dbName := ast.NewCIStr("")
if isTableName {
dbName = name.DBName
}
clonedNames[i] = &types.FieldName{
DBName: dbName,
OrigTblName: name.OrigTblName,
OrigColName: name.OrigColName,
TblName: x.AsName,
ColName: name.ColName,
NotExplicitUsable: name.NotExplicitUsable,
Redundant: name.Redundant,
Hidden: name.Hidden,
}
}
clonedNames[i] = &types.FieldName{
DBName: dbName,
OrigTblName: name.OrigTblName,
OrigColName: name.OrigColName,
TblName: x.AsName,
ColName: name.ColName,
NotExplicitUsable: name.NotExplicitUsable,
Redundant: name.Redundant,
Hidden: name.Hidden,
p.SetOutputNames(clonedNames)
} else {
// Non-LATERAL: update TblName in place (original behavior).
for _, name := range p.OutputNames() {
if name.Hidden {
continue
}
name.TblName = x.AsName
}
}
p.SetOutputNames(clonedNames)
}
// Apply column alias list from AS dt(c1, c2, ...) syntax.
// Only valid for derived tables (not table names); parser enforces this.
Expand Down Expand Up @@ -674,14 +685,35 @@ func findJoinFullSchema(p base.LogicalPlan) (*expression.Schema, types.NameSlice
func containsLateralTableSource(node ast.ResultSetNode) bool {
switch n := node.(type) {
case *ast.TableSource:
return n.Lateral
if n.Lateral {
return true
}
// Descend into the inner source (derived table / set-op) so nested
// LATERAL inside a subquery or set-op used as a table source is detected.
return containsLateralTableSource(n.Source)
case *ast.Join:
// For parenthesized single table refs, the parser creates Join{Left: TableSource, Right: nil}
if n.Right == nil {
return containsLateralTableSource(n.Left)
}
// Check both sides for nested LATERAL
return containsLateralTableSource(n.Left) || containsLateralTableSource(n.Right)
case *ast.SelectStmt:
// Descend into the FROM clause of a derived subquery.
if n.From != nil {
return containsLateralTableSource(n.From.TableRefs)
}
return false
case *ast.SetOprStmt:
// Check each operand in the UNION/INTERSECT/EXCEPT list.
if n.SelectList != nil {
for _, sel := range n.SelectList.Selects {
if rs, ok := sel.(ast.ResultSetNode); ok && containsLateralTableSource(rs) {
return true
}
}
}
return false
default:
return false
}
Expand Down Expand Up @@ -982,7 +1014,6 @@ func (b *PlanBuilder) buildLateralJoin(ctx context.Context, leftPlan, rightPlan

ap.SetChildren(leftPlan, rightPlan)
ap.SetSchema(expression.MergeSchema(leftPlan.Schema(), rightPlan.Schema()))
setIsInApplyForCTE(rightPlan, ap.Schema())

// Note: nullability adjustment is not needed for InnerJoin (the only type supported currently).
// When LEFT/RIGHT JOIN support is added, ResetNotNullFlag must be called here.
Expand Down Expand Up @@ -1040,6 +1071,10 @@ func (b *PlanBuilder) buildLateralJoin(ctx context.Context, leftPlan, rightPlan
ap.FullNames = append(ap.FullNames, &name)
}

// Mark inner CTEs against FullSchema so correlations via USING/NATURAL
// merged columns are detected and the CTE storage is reset per outer row.
setIsInApplyForCTE(rightPlan, ap.FullSchema)

// Handle ON conditions if present
if joinNode.On != nil {
b.curClause = onClause
Expand Down
9 changes: 5 additions & 4 deletions pkg/planner/core/operator/logicalop/logical_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ func (la *LogicalApply) DeriveStats(childStats []*property.StatsInfo, selfSchema
if la.JoinType == base.LeftOuterJoin {
rowCount = max(rowCount, leftProfile.RowCount)
}
} else if la.JoinType == base.SemiJoin || la.JoinType == base.AntiSemiJoin {
// For SemiJoin and AntiSemiJoin Apply operators (EXISTS / NOT EXISTS
// subqueries that cannot be decorrelated), apply SelectionFactor to
// the row count estimate, consistent with LogicalJoin.DeriveStats.
} else if la.IsLateral && (la.JoinType == base.SemiJoin || la.JoinType == base.AntiSemiJoin) {
// For LATERAL SemiJoin/AntiSemiJoin Apply operators, apply SelectionFactor
// to the row count estimate, consistent with LogicalJoin.DeriveStats.
// Non-lateral Apply (correlated subqueries) keeps the original left row count
// to avoid changing existing plan estimates.
rowCount *= cost.SelectionFactor
}
la.SetStats(&property.StatsInfo{
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/operator/logicalop/logical_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
vars := p.SCtx().GetSessionVars()
savedParallelApply := vars.EnableParallelApply
vars.EnableParallelApply = false
defer func() { vars.EnableParallelApply = savedParallelApply }()
_, p.Cte.RecursivePartPhysicalPlan, _, err = utilfuncp.DoOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.RecursivePartLogicalPlan)
vars.EnableParallelApply = savedParallelApply
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, will the original code cause any issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a deeper analysis - the original code works correctly in a normal execution. However, it is not panic-safe. If DoOptimize panics, then the original code would leak EnableParallelApply = false for the rest of the session. It is not clear to me how many users enable parallel Apply currently. And this fix is already in the cherry pick branch (where this bug was found). And it is unlikely that anyone else has this issue - since lateral join fix was recently merged to master.

if err != nil {
return nil, false, err
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,10 @@ func (s *DecorrelateSolver) optimize(ctx context.Context, p base.LogicalPlan, gr
// to find the underlying LogicalJoin, matching the schema used for name
// resolution in LATERAL subqueries (see logical_plan_builder.go buildJoin).
outerSchema := outerPlan.Schema()
if fullSchema, _ := findJoinFullSchema(outerPlan); fullSchema != nil {
outerSchema = fullSchema
if apply.IsLateral {
if fullSchema, _ := findJoinFullSchema(outerPlan); fullSchema != nil {
outerSchema = fullSchema
}
}
apply.CorCols = coreusage.ExtractCorColumnsBySchema4LogicalPlan(innerPlan, outerSchema)
if len(apply.CorCols) == 0 {
Expand All @@ -258,7 +260,7 @@ func (s *DecorrelateSolver) optimize(ctx context.Context, p base.LogicalPlan, gr
// Notice that no matter what kind of join is, it's always right.
newConds := make([]expression.Expression, 0, len(sel.Conditions))
for _, cond := range sel.Conditions {
newConds = append(newConds, cond.Decorrelate(outerPlan.Schema()))
newConds = append(newConds, cond.Decorrelate(outerSchema))
}
apply.AttachOnConds(newConds)
innerPlan = sel.Children()[0]
Expand Down Expand Up @@ -296,9 +298,9 @@ func (s *DecorrelateSolver) optimize(ctx context.Context, p base.LogicalPlan, gr
}
// step2: when it can be substituted all, we then just do the de-correlation (apply conditions included).
for i, expr := range proj.Exprs {
proj.Exprs[i] = expr.Decorrelate(outerPlan.Schema())
proj.Exprs[i] = expr.Decorrelate(outerSchema)
}
apply.Decorrelate(outerPlan.Schema())
apply.Decorrelate(outerSchema)

innerPlan = proj.Children()[0]
apply.SetChildren(outerPlan, innerPlan)
Expand Down
Loading