From 0f52dc151a884d3a3ede88bf84041a803c9591c4 Mon Sep 17 00:00:00 2001 From: tpp <146148086+terry1purcell@users.noreply.github.com> Date: Tue, 7 Apr 2026 16:46:49 -0700 Subject: [PATCH] This is an automated cherry-pick of #67482 Signed-off-by: ti-chi-bot --- pkg/planner/core/logical_plan_builder.go | 277 ++++++++++++++++++ .../core/operator/logicalop/logical_cte.go | 14 + pkg/planner/core/rule_decorrelate.go | 21 +- 3 files changed, 309 insertions(+), 3 deletions(-) diff --git a/pkg/planner/core/logical_plan_builder.go b/pkg/planner/core/logical_plan_builder.go index ead149d7bdf80..cfe5b8d951e61 100644 --- a/pkg/planner/core/logical_plan_builder.go +++ b/pkg/planner/core/logical_plan_builder.go @@ -466,9 +466,41 @@ func (b *PlanBuilder) buildResultSetNode(ctx context.Context, node ast.ResultSet return nil, err } +<<<<<<< HEAD for _, name := range p.OutputNames() { if name.Hidden { continue +======= + if x.AsName.L != "" { + // Clone output names before modifying to avoid mutating shared structs. + // This is critical for CTEs whose output names are shared across multiple + // references — in-place mutation would corrupt other consumers. + clonedNames := make([]*types.FieldName, len(p.OutputNames())) + for i, name := range p.OutputNames() { + if name.Hidden { + clonedNames[i] = name + continue + } + // Clone the field name and update table name. + // For derived tables, clear DBName so that error messages (e.g. only_full_group_by) + // show "alias.col" not "db.alias.col". The current-database qualifier needed for + // hint generation (leading()) is set separately on plannerSelectBlockAsName below. + // For base-table aliases (isTableName), inherit DBName for DEFAULT() resolution. + dbName := ast.NewCIStr("") + if isTableName { + dbName = name.DBName + } + clonedNames[i] = &types.FieldName{ + DBName: dbName, + OrigTblName: name.OrigTblName, + OrigColName: name.OrigColName, + TblName: x.AsName, + ColName: name.ColName, + NotExplicitUsable: name.NotExplicitUsable, + Redundant: name.Redundant, + Hidden: name.Hidden, + } +>>>>>>> 72f9da0b023 (planner: lateral join quality updates (#67482)) } if x.AsName.L != "" { name.TblName = x.AsName @@ -556,6 +588,105 @@ func setPreferredStoreType(ds *logicalop.DataSource, hintInfo *h.PlanHints) { } } +<<<<<<< HEAD +======= +// findJoinFullSchema walks through single-child wrapper operators (e.g., LogicalSelection +// created by ON clauses on inner joins) to find an underlying LogicalJoin or LogicalApply +// with FullSchema. This is needed because USING/NATURAL joins set FullSchema on the +// LogicalJoin, but when that join has an ON condition with InnerJoin type, the result is +// wrapped in a LogicalSelection, hiding the FullSchema from direct type assertions. +func findJoinFullSchema(p base.LogicalPlan) (*expression.Schema, types.NameSlice) { + for { + switch x := p.(type) { + case *logicalop.LogicalJoin: + if x.FullSchema != nil { + return x.FullSchema, x.FullNames + } + return nil, nil + case *logicalop.LogicalApply: + if x.FullSchema != nil { + return x.FullSchema, x.FullNames + } + return nil, nil + case *logicalop.LogicalSelection: + // LogicalSelection is a transparent wrapper (e.g., from ON clauses); look through it. + // Do NOT walk through LogicalProjection: it represents a derived table boundary, + // and exposing its inner join's FullSchema would leak inner table aliases. + children := p.Children() + if len(children) != 1 { + return nil, nil + } + p = children[0] + default: + return nil, nil + } + } +} + +// containsLateralTableSource checks if a ResultSetNode contains a LATERAL table source +// anywhere in its subtree. Used only to decide whether to push outerSchemas before +// building the right side, so nested LATERAL sources can resolve outer columns. +func containsLateralTableSource(node ast.ResultSetNode) bool { + switch n := node.(type) { + case *ast.TableSource: + if n.Lateral { + return true + } + // Descend into the inner source (derived table / set-op) so nested + // LATERAL inside a subquery or set-op used as a table source is detected. + return containsLateralTableSource(n.Source) + case *ast.Join: + // For parenthesized single table refs, the parser creates Join{Left: TableSource, Right: nil} + if n.Right == nil { + return containsLateralTableSource(n.Left) + } + // Check both sides for nested LATERAL + return containsLateralTableSource(n.Left) || containsLateralTableSource(n.Right) + case *ast.SelectStmt: + // Descend into the FROM clause of a derived subquery. + if n.From != nil { + return containsLateralTableSource(n.From.TableRefs) + } + return false + case *ast.SetOprStmt: + // Check each operand in the UNION/INTERSECT/EXCEPT list. + if n.SelectList != nil { + for _, sel := range n.SelectList.Selects { + if rs, ok := sel.(ast.ResultSetNode); ok && containsLateralTableSource(rs) { + return true + } + } + } + return false + default: + return false + } +} + +// isImmediateLateralTableSource checks whether the top-level ResultSetNode is itself a +// LATERAL TableSource, without recursing into both sides of a multi-table nested join. +// A parenthesized single-table form (Join{Left: source, Right: nil}) is transparent and +// is unwrapped, but a multi-table join on the right side is not itself a LATERAL source. +// This is used after rightPlan is built to decide whether to produce a LogicalApply: +// only the immediate right operand being LATERAL (or actual correlations in rightPlan) +// should trigger that, not a LATERAL nested deeper in the right subtree. +func isImmediateLateralTableSource(node ast.ResultSetNode) bool { + switch n := node.(type) { + case *ast.TableSource: + return n.Lateral + case *ast.Join: + // Parenthesized single-table ref: parser creates Join{Left: source, Right: nil} + if n.Right == nil { + return isImmediateLateralTableSource(n.Left) + } + // A multi-table join is not itself a single LATERAL source + return false + default: + return false + } +} + +>>>>>>> 72f9da0b023 (planner: lateral join quality updates (#67482)) func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (base.LogicalPlan, error) { // We will construct a "Join" node for some statements like "INSERT", // "DELETE", "UPDATE", "REPLACE". For this scenario "joinNode.Right" is nil @@ -704,6 +835,152 @@ func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (base.L return joinPlan, nil } +<<<<<<< HEAD +======= +// buildLateralJoin builds a LogicalApply for LATERAL derived tables. +// LATERAL makes left-side columns available to the right-side subquery, +// which is semantically equivalent to a correlated subquery. +func (b *PlanBuilder) buildLateralJoin(ctx context.Context, leftPlan, rightPlan base.LogicalPlan, joinNode *ast.Join) (base.LogicalPlan, error) { + // NATURAL JOIN and USING clauses are not supported with LATERAL derived tables. + // These clauses match columns by name between two tables, but LATERAL subqueries + // define their own output column names which typically don't match the left-side table. + if joinNode.NaturalJoin { + return nil, plannererrors.ErrInvalidLateralJoin.GenWithStackByArgs("NATURAL JOIN is not supported with LATERAL") + } + if joinNode.Using != nil { + return nil, plannererrors.ErrInvalidLateralJoin.GenWithStackByArgs("USING clause is not supported with LATERAL") + } + + // Extract correlated columns from right side that reference left side. + // Use FullSchema when leftPlan contains a USING/NATURAL join, so we capture + // correlated columns that reference the redundant (merged) join columns. + // Walk through wrapper operators (e.g., LogicalSelection) to find FullSchema. + outerSchema := leftPlan.Schema() + if fullSchema, _ := findJoinFullSchema(leftPlan); fullSchema != nil { + outerSchema = fullSchema + } + corCols := coreusage.ExtractCorColumnsBySchema4LogicalPlan(rightPlan, outerSchema) + + // Determine join type based on AST. + // Currently supports INNER JOIN and comma syntax (which the parser represents as CrossJoin). + // LEFT/RIGHT JOIN will be added in a follow-up PR. + var joinType base.JoinType + switch joinNode.Tp { + case ast.LeftJoin: + return nil, plannererrors.ErrInvalidLateralJoin.GenWithStackByArgs("LEFT JOIN is not supported with LATERAL") + case ast.RightJoin: + return nil, plannererrors.ErrInvalidLateralJoin.GenWithStackByArgs("RIGHT JOIN is not supported with LATERAL") + default: + // Comma syntax (ast.CrossJoin) and explicit INNER JOIN + joinType = base.InnerJoin + } + + // Build LogicalApply (leveraging existing correlated subquery infrastructure) + // Note: We enable decorrelation optimization, which will attempt to convert the + // nested loop (LogicalApply) into a more efficient join when safe. The decorrelator + // is conservative and will only transform patterns it can prove are semantically equivalent. + // Complex cases (aggregates with correlation, non-deterministic functions, etc.) will + // remain as Apply and use nested loop execution. + b.optFlag = b.optFlag | rule.FlagPredicatePushDown | rule.FlagBuildKeyInfo | rule.FlagDecorrelate | rule.FlagConstantPropagation + + ap := logicalop.LogicalApply{ + LogicalJoin: logicalop.LogicalJoin{JoinType: joinType}, + CorCols: corCols, + NoDecorrelate: false, // Allow decorrelation; optimizer will decide if safe + IsLateral: true, // Mark as LATERAL join to prevent unsafe elimination in PruneColumns + }.Init(b.ctx, b.getSelectOffset()) + + ap.SetChildren(leftPlan, rightPlan) + ap.SetSchema(expression.MergeSchema(leftPlan.Schema(), rightPlan.Schema())) + + // Note: nullability adjustment is not needed for InnerJoin (the only type supported currently). + // When LEFT/RIGHT JOIN support is added, ResetNotNullFlag must be called here. + + // Clone output names to avoid sharing FieldName structs that might be mutated later. + // Do NOT override DBName here: derived-table outputs already carry DBName="" from + // buildResultSetNode, and real-table columns inside a nested right subtree (e.g. + // t2 in "t2 CROSS JOIN LATERAL(...)") must keep their original DBName so that + // schema-qualified references like ORDER BY test.t2.a continue to resolve. + outputNames := make([]*types.FieldName, leftPlan.Schema().Len()+rightPlan.Schema().Len()) + for i, name := range leftPlan.OutputNames() { + outputNames[i] = name.Clone() + } + for i, name := range rightPlan.OutputNames() { + outputNames[leftPlan.Schema().Len()+i] = name.Clone() + } + ap.SetOutputNames(outputNames) + + // Set FullSchema and FullNames for USING clause handling (consistent with buildJoin). + // Even though LATERAL typically doesn't use USING, LogicalApply extends LogicalJoin + // which has these fields, so we should set them for correctness. + var lFullSchema, rFullSchema *expression.Schema + var lFullNames, rFullNames types.NameSlice + if fullSchema, fullNames := findJoinFullSchema(leftPlan); fullSchema != nil { + lFullSchema = fullSchema + lFullNames = fullNames + } else { + lFullSchema = leftPlan.Schema() + lFullNames = leftPlan.OutputNames() + } + // When buildLateralJoin is triggered by condition (b) (correlated columns in a nested + // right subtree), rightPlan may itself be a LogicalJoin or LogicalApply with its own + // FullSchema from a USING/NATURAL join on the right side. Propagate it upward the same + // way buildJoin does for the right child. + if fullSchema, fullNames := findJoinFullSchema(rightPlan); fullSchema != nil { + rFullSchema = fullSchema + rFullNames = fullNames + } else { + rFullSchema = rightPlan.Schema() + rFullNames = rightPlan.OutputNames() + } + + ap.FullSchema = expression.MergeSchema(lFullSchema, rFullSchema) + + // Note: FullSchema nullability adjustment is not needed for InnerJoin. + // When LEFT/RIGHT JOIN support is added, ResetNotNullFlag must be called here. + + ap.FullNames = make([]*types.FieldName, 0, len(lFullNames)+len(rFullNames)) + for _, lName := range lFullNames { + name := *lName + ap.FullNames = append(ap.FullNames, &name) + } + for _, rName := range rFullNames { + name := *rName + ap.FullNames = append(ap.FullNames, &name) + } + + // Mark inner CTEs against FullSchema so correlations via USING/NATURAL + // merged columns are detected and the CTE storage is reset per outer row. + setIsInApplyForCTE(rightPlan, ap.FullSchema) + + // Handle ON conditions if present + if joinNode.On != nil { + b.curClause = onClause + onExpr, newPlan, err := b.rewrite(ctx, joinNode.On.Expr, ap, nil, false) + if err != nil { + return nil, err + } + if newPlan != ap { + return nil, plannererrors.ErrInvalidLateralJoin.GenWithStackByArgs("ON condition contains subqueries") + } + onCondition := expression.SplitCNFItems(onExpr) + ap.AttachOnConds(onCondition) + } + + // Note: nullability reset for outer joins not needed for InnerJoin. + + // Merge handle maps (copied from buildJoin) + handleMap1 := b.handleHelper.popMap() + handleMap2 := b.handleHelper.popMap() + b.handleHelper.mergeAndPush(handleMap1, handleMap2) + + // Set join hints if any + ap.LogicalJoin.SetPreferredJoinTypeAndOrder(b.TableHints()) + + return ap, nil +} + +>>>>>>> 72f9da0b023 (planner: lateral join quality updates (#67482)) // buildUsingClause eliminate the redundant columns and ordering columns based // on the "USING" clause. // diff --git a/pkg/planner/core/operator/logicalop/logical_cte.go b/pkg/planner/core/operator/logicalop/logical_cte.go index 64b4b4d76177a..758a1ac366b5c 100644 --- a/pkg/planner/core/operator/logicalop/logical_cte.go +++ b/pkg/planner/core/operator/logicalop/logical_cte.go @@ -210,6 +210,20 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression } if p.Cte.RecursivePartLogicalPlan != nil { if p.Cte.RecursivePartPhysicalPlan == nil { +<<<<<<< HEAD +======= + // TODO: parallel apply inside a recursive CTE body produces incorrect results + // (grandchildren are silently dropped) because the CTE iteration model shares + // mutable state (the working-table buffer) across goroutines, causing rows from + // deeper recursion levels to be lost. Disable parallel apply for the recursive + // body until the executor is fixed to handle this safely. + // See: TestLateralHierarchyParallelApply (flat query verifies concurrency > 1 + // for non-recursive LATERAL; recursive correctness is tracked separately). + vars := p.SCtx().GetSessionVars() + savedParallelApply := vars.EnableParallelApply + vars.EnableParallelApply = false + defer func() { vars.EnableParallelApply = savedParallelApply }() +>>>>>>> 72f9da0b023 (planner: lateral join quality updates (#67482)) _, p.Cte.RecursivePartPhysicalPlan, _, err = utilfuncp.DoOptimize(context.TODO(), p.SCtx(), p.Cte.OptFlag, p.Cte.RecursivePartLogicalPlan) if err != nil { return nil, err diff --git a/pkg/planner/core/rule_decorrelate.go b/pkg/planner/core/rule_decorrelate.go index ab2d8c0f031c0..d2dbaba87bd5a 100644 --- a/pkg/planner/core/rule_decorrelate.go +++ b/pkg/planner/core/rule_decorrelate.go @@ -139,7 +139,22 @@ func (s *DecorrelateSolver) Optimize(ctx context.Context, p base.LogicalPlan, op if apply, ok := p.(*logicalop.LogicalApply); ok { outerPlan := apply.Children()[0] innerPlan := apply.Children()[1] +<<<<<<< HEAD apply.CorCols = coreusage.ExtractCorColumnsBySchema4LogicalPlan(apply.Children()[1], apply.Children()[0].Schema()) +======= + // Use FullSchema when outer plan is a USING/NATURAL join, so we capture + // correlated columns that reference the redundant (merged) join columns. + // Walk through wrapper operators (e.g., LogicalSelection from ON clauses) + // to find the underlying LogicalJoin, matching the schema used for name + // resolution in LATERAL subqueries (see logical_plan_builder.go buildJoin). + outerSchema := outerPlan.Schema() + if apply.IsLateral { + if fullSchema, _ := findJoinFullSchema(outerPlan); fullSchema != nil { + outerSchema = fullSchema + } + } + apply.CorCols = coreusage.ExtractCorColumnsBySchema4LogicalPlan(innerPlan, outerSchema) +>>>>>>> 72f9da0b023 (planner: lateral join quality updates (#67482)) if len(apply.CorCols) == 0 { // If the inner plan is non-correlated, the apply will be simplified to join. join := &apply.LogicalJoin @@ -154,7 +169,7 @@ func (s *DecorrelateSolver) Optimize(ctx context.Context, p base.LogicalPlan, op // Notice that no matter what kind of join is, it's always right. newConds := make([]expression.Expression, 0, len(sel.Conditions)) for _, cond := range sel.Conditions { - newConds = append(newConds, cond.Decorrelate(outerPlan.Schema())) + newConds = append(newConds, cond.Decorrelate(outerSchema)) } apply.AttachOnConds(newConds) innerPlan = sel.Children()[0] @@ -194,9 +209,9 @@ func (s *DecorrelateSolver) Optimize(ctx context.Context, p base.LogicalPlan, op } // step2: when it can be substituted all, we then just do the de-correlation (apply conditions included). for i, expr := range proj.Exprs { - proj.Exprs[i] = expr.Decorrelate(outerPlan.Schema()) + proj.Exprs[i] = expr.Decorrelate(outerSchema) } - apply.Decorrelate(outerPlan.Schema()) + apply.Decorrelate(outerSchema) innerPlan = proj.Children()[0] apply.SetChildren(outerPlan, innerPlan)