-
Notifications
You must be signed in to change notification settings - Fork 6.2k
planner, executor: support merge sort for IN conditions in IndexMerge partial paths #67771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
81851b0
86b4c8f
bdcdb56
4d41538
b9460ec
10e14a7
5d0cba0
a05d8c0
de9f51b
5a6eb67
c9d0dda
954faa0
93a8286
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ import ( | |
| "github.com/pingcap/tidb/pkg/meta/model" | ||
| "github.com/pingcap/tidb/pkg/parser/mysql" | ||
| "github.com/pingcap/tidb/pkg/parser/terror" | ||
| plannercore "github.com/pingcap/tidb/pkg/planner/core" | ||
| "github.com/pingcap/tidb/pkg/planner/core/base" | ||
| "github.com/pingcap/tidb/pkg/planner/core/operator/physicalop" | ||
| plannerutil "github.com/pingcap/tidb/pkg/planner/util" | ||
|
|
@@ -110,7 +111,14 @@ type IndexMergeReaderExecutor struct { | |
| // fields about accessing partition tables | ||
| partitionTableMode bool // if this IndexMerge is accessing a partition table | ||
| prunedPartitions []table.PhysicalTable // pruned partition tables need to access | ||
| partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges] | ||
|
|
||
| // partialWorkerKVRanges stores the pre-built kv ranges for each partial worker. This field unifies the previous | ||
| // keyRanges and partitionKeyRanges fields. | ||
| // partialWorkerKVRanges[i] is for the i-th partial path, and consists of one or multiple grouped kv ranges (which | ||
| // may come from partitions, grouped ranges from IN conditions, or both). Each group is a kvRangesWithPhysicalTblID. | ||
| // Note that IndexMergeReaderExecutor only uses this for partial index paths and doesn't rely on this field for | ||
| // partial table paths, but memIndexMergeReader needs this, so we still build this field for all partial paths. | ||
| partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID // partial paths -> grouped kv ranges -> kv ranges | ||
|
|
||
| // All fields above are immutable. | ||
|
|
||
|
|
@@ -120,7 +128,6 @@ type IndexMergeReaderExecutor struct { | |
| finished chan struct{} | ||
|
|
||
| workerStarted bool | ||
| keyRanges [][]kv.KeyRange | ||
|
|
||
| resultCh chan *indexMergeTableTask | ||
| resultCurr *indexMergeTableTask | ||
|
|
@@ -165,7 +172,6 @@ func (e *IndexMergeReaderExecutor) Table() table.Table { | |
|
|
||
| // Open implements the Executor Open interface | ||
| func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) { | ||
| e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans)) | ||
| e.initRuntimeStats() | ||
| if e.isCorColInTableFilter { | ||
| e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans) | ||
|
|
@@ -177,29 +183,10 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) { | |
| return err | ||
| } | ||
|
|
||
| if !e.partitionTableMode { | ||
| if e.keyRanges, err = e.buildKeyRangesForTable(e.table); err != nil { | ||
| return err | ||
| } | ||
| } else { | ||
| e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes)) | ||
| tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions)) | ||
| for i, p := range e.prunedPartitions { | ||
| if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| for i, idx := range e.indexes { | ||
| if idx != nil && idx.Global { | ||
| keyRange, _ := distsql.IndexRangesToKVRanges(e.sctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i]) | ||
| e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()} | ||
| } else { | ||
| for _, pKeyRanges := range tmpPartitionKeyRanges { | ||
| e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i]) | ||
| } | ||
| } | ||
| } | ||
| if err = e.buildPartialWorkerKVRanges(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| e.finished = make(chan struct{}) | ||
| e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) | ||
| if e.memTracker != nil { | ||
|
|
@@ -222,8 +209,26 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) { | |
| switch x := plan[0].(type) { | ||
| case *physicalop.PhysicalIndexScan: | ||
| e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if len(x.GroupByColIdxs) > 0 { | ||
| x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| case *physicalop.PhysicalTableScan: | ||
| e.ranges[i], err = x.ResolveCorrelatedColumns() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if len(x.GroupByColIdxs) > 0 { | ||
| x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| default: | ||
| err = errors.Errorf("unsupported plan type %T", plan[0]) | ||
| } | ||
|
|
@@ -235,37 +240,84 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) { | |
| return nil | ||
| } | ||
|
|
||
| func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) { | ||
| // buildPartialWorkerKVRanges builds the unified partialWorkerKVRanges for all partial workers, | ||
| // handling partition mode and grouped ranges (from IN conditions with merge sort) in a unified way. | ||
| func (e *IndexMergeReaderExecutor) buildPartialWorkerKVRanges() error { | ||
| dctx := e.Ctx().GetDistSQLCtx() | ||
| e.partialWorkerKVRanges = make([][]*kvRangesWithPhysicalTblID, len(e.partialPlans)) | ||
|
|
||
| for i, plan := range e.partialPlans { | ||
| _, ok := plan[0].(*physicalop.PhysicalIndexScan) | ||
| if !ok { | ||
| firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle) | ||
| firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges) | ||
| if err != nil { | ||
| return nil, err | ||
| // Determine grouped ranges: use GroupedRanges from the physical plan if available, | ||
| // otherwise wrap the flat ranges as a single group. | ||
| var ( | ||
| groupedRanges [][]*ranger.Range | ||
| isIdxScan bool | ||
| ) | ||
| switch x := plan[0].(type) { | ||
| case *physicalop.PhysicalIndexScan: | ||
| isIdxScan = true | ||
| groupedRanges = x.GroupedRanges | ||
| case *physicalop.PhysicalTableScan: | ||
| groupedRanges = x.GroupedRanges | ||
| } | ||
| if len(groupedRanges) == 0 { | ||
| groupedRanges = [][]*ranger.Range{e.ranges[i]} | ||
| } | ||
|
|
||
| // Determine the physical tables to scan. | ||
| type tblInfo struct { | ||
| physTblID int64 | ||
| isCommonHdl bool | ||
| } | ||
| var tables []tblInfo | ||
| if isIdxScan && e.partitionTableMode && e.indexes[i] != nil && e.indexes[i].Global { | ||
| tables = []tblInfo{{physTblID: e.table.Meta().ID, isCommonHdl: e.table.Meta().IsCommonHandle}} | ||
| } else if e.partitionTableMode { | ||
| for _, p := range e.prunedPartitions { | ||
| tables = append(tables, tblInfo{physTblID: p.GetPhysicalID(), isCommonHdl: p.Meta().IsCommonHandle}) | ||
| } | ||
| keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...) | ||
| ranges = append(ranges, keyRanges) | ||
| continue | ||
| } else { | ||
| tables = []tblInfo{{physTblID: getPhysicalTableID(e.table), isCommonHdl: e.table.Meta().IsCommonHandle}} | ||
| } | ||
| keyRange, err := distsql.IndexRangesToKVRanges(dctx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i]) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
||
| // Build kv ranges for each group × table. | ||
| for _, ranges := range groupedRanges { | ||
| for _, t := range tables { | ||
| if isIdxScan { | ||
| kvRange, err := distsql.IndexRangesToKVRanges(dctx, t.physTblID, e.indexes[i].ID, ranges) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{ | ||
| PhysicalTableID: t.physTblID, | ||
| KeyRanges: kvRange.FirstPartitionRange(), | ||
| }) | ||
| } else { | ||
| firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, false, e.descs[i], t.isCommonHdl) | ||
| firstKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, firstPartRanges) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| secondKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, secondPartRanges) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| combined := append(firstKV.FirstPartitionRange(), secondKV.FirstPartitionRange()...) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pre-alloc for the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the current implementation is optimal. The compiler should handle this well. |
||
| e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{ | ||
| PhysicalTableID: t.physTblID, | ||
| KeyRanges: combined, | ||
| }) | ||
| } | ||
| } | ||
| } | ||
| ranges = append(ranges, keyRange.FirstPartitionRange()) | ||
| } | ||
| return ranges, nil | ||
| return nil | ||
| } | ||
|
|
||
| func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error { | ||
| exitCh := make(chan struct{}) | ||
| workCh := make(chan *indexMergeTableTask, 1) | ||
| fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges)) | ||
| fetchCh := make(chan *indexMergeTableTask, len(e.partialPlans)) | ||
|
|
||
| e.startIndexMergeProcessWorker(ctx, workCh, fetchCh) | ||
|
|
||
|
|
@@ -337,11 +389,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, | |
| e.dagPBs[workID].CollectExecutionSummaries = &collExec | ||
| } | ||
|
|
||
| var keyRanges [][]kv.KeyRange | ||
| if e.partitionTableMode { | ||
| keyRanges = e.partitionKeyRanges[workID] | ||
| } else { | ||
| keyRanges = [][]kv.KeyRange{e.keyRanges[workID]} | ||
| keyRanges := make([][]kv.KeyRange, 0, len(e.partialWorkerKVRanges[workID])) | ||
| for _, pkr := range e.partialWorkerKVRanges[workID] { | ||
| keyRanges = append(keyRanges, pkr.KeyRanges) | ||
| } | ||
| failpoint.Inject("startPartialIndexWorkerErr", func() error { | ||
| return errors.New("inject an error before start partialIndexWorker") | ||
|
|
@@ -504,6 +554,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, | |
| netDataSize: e.partialNetDataSizes[workID], | ||
| keepOrder: ts.KeepOrder, | ||
| byItems: ts.ByItems, | ||
| groupedRanges: ts.GroupedRanges, | ||
| groupByColIdxs: ts.GroupByColIdxs, | ||
| } | ||
|
|
||
| worker := &partialTableWorker{ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1052,6 +1052,12 @@ func hasV0NewCollationStringHandle(ds *logicalop.DataSource) bool { | |
| } | ||
|
|
||
| func matchProperty(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult { | ||
| // This function may set the two fields below for the PropMatchedNeedMergeSort case, so we reset them here to | ||
| // avoid leaving the AccessPath with an inconsistent state when there are multiple calls to matchProperty with | ||
|
winoros marked this conversation as resolved.
|
||
| // different properties. | ||
| path.GroupedRanges = nil | ||
| path.GroupByColIdxs = nil | ||
|
|
||
| if ds.Table.Type().IsClusterTable() && !prop.IsSortItemEmpty() { | ||
| // TableScan with cluster table can't keep order. | ||
| return property.PropNotMatched | ||
|
|
@@ -1431,8 +1437,7 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc | |
| // if there is some sort items and this path doesn't match this prop, continue. | ||
| match := true | ||
| for _, oneAccessPath := range oneAlternative { | ||
| // Satisfying the property by a merge sort is not supported for partial paths of index merge. | ||
| if !noSortItem && matchProperty(ds, oneAccessPath, prop) != property.PropMatched { | ||
| if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() { | ||
| match = false | ||
|
Comment on lines
1439
to
1441
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserve the exact partial-path match result instead of collapsing it to This drops the distinction between Also applies to: 1543-1545 🤖 Prompt for AI Agents |
||
| } | ||
| } | ||
|
|
@@ -1542,8 +1547,7 @@ func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, p | |
| return property.PropNotMatched | ||
| } | ||
| for _, partialPath := range path.PartialIndexPaths { | ||
| // Satisfying the property by a merge sort is not supported for partial paths of index merge. | ||
| if matchProperty(ds, partialPath, prop) != property.PropMatched { | ||
| if !matchProperty(ds, partialPath, prop).Matched() { | ||
| return property.PropNotMatched | ||
| } | ||
| } | ||
|
|
@@ -2325,8 +2329,6 @@ func checkColinSchema(cols []*expression.Column, schema *expression.Schema) bool | |
| } | ||
|
|
||
| func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (tablePlan base.PhysicalPlan) { | ||
| intest.Assert(matchProp != property.PropMatchedNeedMergeSort, | ||
| "partial paths of index merge path should not match property using merge sort") | ||
| ts, rowCount := physicalop.GetOriginalPhysicalTableScan(ds, prop, path, matchProp.Matched()) | ||
| overwritePartialTableScanSchema(ds, ts) | ||
| // remove ineffetive filter condition after overwriting physicalscan schema | ||
|
|
@@ -2345,6 +2347,10 @@ func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.Physical | |
| ts.SetSchema(tmpSchema) | ||
| } | ||
| ts.ByItems = byItems | ||
| if len(path.GroupedRanges) > 0 { | ||
| ts.GroupedRanges = path.GroupedRanges | ||
| ts.GroupByColIdxs = path.GroupByColIdxs | ||
| } | ||
| } | ||
| if len(ts.FilterCondition) > 0 { | ||
| selectivity, err := cardinality.Selectivity(ds.SCtx(), ds.TableStats.HistColl, ts.FilterCondition, nil) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pre-allocate the memory for the slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.