Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 97 additions & 50 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
Expand Down Expand Up @@ -110,7 +111,14 @@ type IndexMergeReaderExecutor struct {
// fields about accessing partition tables
partitionTableMode bool // if this IndexMerge is accessing a partition table
prunedPartitions []table.PhysicalTable // pruned partition tables need to access
partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges]

// partialWorkerKVRanges stores the pre-built kv ranges for each partial worker. This field unifies the previous
// keyRanges and partitionKeyRanges fields.
// partialWorkerKVRanges[i] is for the i-th partial plan, and consists of multiple grouped kv ranges (wchi may come
// from partitions, grouped ranges from IN conditions, or both). Each group is a kvRangesWithPhysicalTblID.
// Note that IndexMergeReaderExecutor doesn't rely on this field for partial table path, but memIndexMergeReader
// need this, so we still build this field for all partial paths.
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID // partial paths -> grouped kv ranges -> kv ranges

// All fields above are immutable.

Expand All @@ -120,7 +128,6 @@ type IndexMergeReaderExecutor struct {
finished chan struct{}

workerStarted bool
keyRanges [][]kv.KeyRange

resultCh chan *indexMergeTableTask
resultCurr *indexMergeTableTask
Expand Down Expand Up @@ -165,7 +172,6 @@ func (e *IndexMergeReaderExecutor) Table() table.Table {

// Open implements the Executor Open interface
func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
Expand All @@ -177,29 +183,10 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
return err
}

if !e.partitionTableMode {
if e.keyRanges, err = e.buildKeyRangesForTable(e.table); err != nil {
return err
}
} else {
e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes))
tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions))
for i, p := range e.prunedPartitions {
if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil {
return err
}
}
for i, idx := range e.indexes {
if idx != nil && idx.Global {
keyRange, _ := distsql.IndexRangesToKVRanges(e.sctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i])
e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()}
} else {
for _, pKeyRanges := range tmpPartitionKeyRanges {
e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i])
}
}
}
if err = e.buildPartialWorkerKVRanges(); err != nil {
return err
}

e.finished = make(chan struct{})
e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
if e.memTracker != nil {
Expand All @@ -222,8 +209,26 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
switch x := plan[0].(type) {
case *physicalop.PhysicalIndexScan:
e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens)
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
case *physicalop.PhysicalTableScan:
e.ranges[i], err = x.ResolveCorrelatedColumns()
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
default:
err = errors.Errorf("unsupported plan type %T", plan[0])
}
Expand All @@ -235,37 +240,79 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
return nil
}

func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) {
// buildPartialWorkerKVRanges builds the unified partialWorkerKVRanges for all partial workers,
// handling partition mode and grouped ranges (from IN conditions with merge sort) in a unified way.
func (e *IndexMergeReaderExecutor) buildPartialWorkerKVRanges() error {
dctx := e.Ctx().GetDistSQLCtx()
e.partialWorkerKVRanges = make([][]*kvRangesWithPhysicalTblID, len(e.partialPlans))

for i, plan := range e.partialPlans {
_, ok := plan[0].(*physicalop.PhysicalIndexScan)
if !ok {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle)
firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges)
if err != nil {
return nil, err
}
secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges)
if err != nil {
return nil, err
// Determine grouped ranges: use GroupedRanges from the physical plan if available,
// otherwise wrap the flat ranges as a single group.
var groupedRanges [][]*ranger.Range
var isIdxScan bool
if x, ok := plan[0].(*physicalop.PhysicalIndexScan); ok {
isIdxScan = true
groupedRanges = x.GroupedRanges
}
if groupedRanges == nil {
groupedRanges = [][]*ranger.Range{e.ranges[i]}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

// Determine the physical tables to scan.
type tblInfo struct {
physTblID int64
isCommonHdl bool
}
var tables []tblInfo
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pre-allocate the memory for the slice?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

if isIdxScan && e.partitionTableMode && e.indexes[i] != nil && e.indexes[i].Global {
tables = []tblInfo{{physTblID: e.table.Meta().ID, isCommonHdl: e.table.Meta().IsCommonHandle}}
} else if e.partitionTableMode {
for _, p := range e.prunedPartitions {
tables = append(tables, tblInfo{physTblID: p.GetPhysicalID(), isCommonHdl: p.Meta().IsCommonHandle})
}
keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...)
ranges = append(ranges, keyRanges)
continue
} else {
tables = []tblInfo{{physTblID: getPhysicalTableID(e.table), isCommonHdl: e.table.Meta().IsCommonHandle}}
}
keyRange, err := distsql.IndexRangesToKVRanges(dctx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i])
if err != nil {
return nil, err

// Build kv ranges for each group × table.
for _, ranges := range groupedRanges {
for _, t := range tables {
if isIdxScan {
kvRange, err := distsql.IndexRangesToKVRanges(dctx, t.physTblID, e.indexes[i].ID, ranges)
if err != nil {
return err
}
e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: kvRange.FirstPartitionRange(),
})
} else {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, false, e.descs[i], t.isCommonHdl)
firstKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, firstPartRanges)
if err != nil {
return err
}
secondKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, secondPartRanges)
if err != nil {
return err
}
combined := append(firstKV.FirstPartitionRange(), secondKV.FirstPartitionRange()...)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-alloc for the combined too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current implementation is optimal. The compiler should handle this well.

e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: combined,
})
}
}
}
ranges = append(ranges, keyRange.FirstPartitionRange())
}
return ranges, nil
return nil
}

func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error {
exitCh := make(chan struct{})
workCh := make(chan *indexMergeTableTask, 1)
fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges))
fetchCh := make(chan *indexMergeTableTask, len(e.partialPlans))

e.startIndexMergeProcessWorker(ctx, workCh, fetchCh)

Expand Down Expand Up @@ -337,11 +384,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
e.dagPBs[workID].CollectExecutionSummaries = &collExec
}

var keyRanges [][]kv.KeyRange
if e.partitionTableMode {
keyRanges = e.partitionKeyRanges[workID]
} else {
keyRanges = [][]kv.KeyRange{e.keyRanges[workID]}
keyRanges := make([][]kv.KeyRange, 0, len(e.partialWorkerKVRanges[workID]))
for _, pkr := range e.partialWorkerKVRanges[workID] {
keyRanges = append(keyRanges, pkr.KeyRanges)
}
failpoint.Inject("startPartialIndexWorkerErr", func() error {
return errors.New("inject an error before start partialIndexWorker")
Expand Down Expand Up @@ -504,6 +549,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: ts.ByItems,
groupedRanges: ts.GroupedRanges,
groupByColIdxs: ts.GroupByColIdxs,
}

worker := &partialTableWorker{
Expand Down
27 changes: 8 additions & 19 deletions pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,8 @@ type memIndexMergeReader struct {
memReaders []memReader
isIntersection bool

// partition mode
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables
partitionMode bool // if it is accessing a partition table
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID

keepOrder bool
compareExec
Expand Down Expand Up @@ -873,9 +871,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
memReaders: memReaders,
isIntersection: indexMergeReader.isIntersection,

partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,
partitionMode: indexMergeReader.partitionTableMode,
partialWorkerKVRanges: indexMergeReader.partialWorkerKVRanges,

keepOrder: us.keepOrder,
compareExec: us.compareExec,
Expand Down Expand Up @@ -1048,19 +1045,12 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
hMap := kv.NewHandleMap()
// loop each memReaders and fill handle map
for i, reader := range m.memReaders {
// [partitionNum][rangeNum]
var readerKvRanges [][]kv.KeyRange
if m.partitionMode {
readerKvRanges = m.partitionKVRanges[i]
} else {
readerKvRanges = [][]kv.KeyRange{m.indexMergeReader.keyRanges[i]}
}
for j, kr := range readerKvRanges {
for _, pkr := range m.partialWorkerKVRanges[i] {
switch r := reader.(type) {
case *memTableReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
case *memIndexReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
default:
return nil, errors.New("memReader have to be memTableReader or memIndexReader")
}
Expand All @@ -1071,8 +1061,7 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
// Filter same row.
for _, handle := range handles {
if _, ok := handle.(kv.PartitionHandle); !ok && m.partitionMode {
pid := m.partitionTables[j].GetPhysicalID()
handle = kv.NewPartitionHandle(pid, handle)
handle = kv.NewPartitionHandle(pkr.PhysicalTableID, handle)
}
if v, ok := hMap.Get(handle); !ok {
cnt := 1
Expand Down
18 changes: 12 additions & 6 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,12 @@ func hasV0NewCollationStringHandle(ds *logicalop.DataSource) bool {
}

func matchProperty(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult {
// This function may set the two fields below for the PropMatchedNeedMergeSort case, so we reset them here to
// avoid leaving the AccessPath with an inconsistent state when there are multiple calls to matchProperty with
Comment thread
winoros marked this conversation as resolved.
// different properties.
path.GroupedRanges = nil
path.GroupByColIdxs = nil

if ds.Table.Type().IsClusterTable() && !prop.IsSortItemEmpty() {
// TableScan with cluster table can't keep order.
return property.PropNotMatched
Expand Down Expand Up @@ -1431,8 +1437,7 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if !noSortItem && matchProperty(ds, oneAccessPath, prop) != property.PropMatched {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
Comment on lines 1439 to 1441
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the exact partial-path match result instead of collapsing it to .Matched().

This drops the distinction between PropMatched and PropMatchedNeedMergeSort, but the later partial-scan conversion recovers that distinction from AccessPath.GroupedRanges / GroupByColIdxs. Those fields are mutable planner scratch state, and matchProperty() only populates them on the merge-sort path; it does not clear them on plain matches in the code shown here. If the same AccessPath is reused across property explorations, stale grouping metadata can leak into a later direct-match plan and force unnecessary grouped cop tasks / merge-sort. Please either clear the grouping fields whenever the current match is not PropMatchedNeedMergeSort, or carry the per-partial PhysicalPropMatchResult through instead of reducing it to a bool.

Also applies to: 1543-1545

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/planner/core/find_best_task.go` around lines 1433 - 1435, The code
currently reduces the detailed result from matchProperty(...) to a boolean via
.Matched(), losing the PropMatched vs PropMatchedNeedMergeSort distinction and
allowing stale AccessPath.GroupedRanges/GroupByColIdxs to leak; update the logic
in the loop that iterates oneAlternative to either (a) preserve the full
PhysicalPropMatchResult returned by matchProperty(...) and use its Matched/Kind
to decide match, or (b) after calling matchProperty(...) and finding it is NOT
PropMatchedNeedMergeSort, explicitly clear the mutable grouping fields on the
AccessPath (AccessPath.GroupedRanges and AccessPath.GroupByColIdxs) so stale
grouping metadata cannot affect subsequent direct-match plans; apply the same
fix to the similar block around match checks at the other location mentioned
(the block around lines referenced as 1543-1545).

}
}
Expand Down Expand Up @@ -1542,8 +1547,7 @@ func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, p
return property.PropNotMatched
}
for _, partialPath := range path.PartialIndexPaths {
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if matchProperty(ds, partialPath, prop) != property.PropMatched {
if !matchProperty(ds, partialPath, prop).Matched() {
return property.PropNotMatched
}
}
Expand Down Expand Up @@ -2325,8 +2329,6 @@ func checkColinSchema(cols []*expression.Column, schema *expression.Schema) bool
}

func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (tablePlan base.PhysicalPlan) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
ts, rowCount := physicalop.GetOriginalPhysicalTableScan(ds, prop, path, matchProp.Matched())
overwritePartialTableScanSchema(ds, ts)
// remove ineffetive filter condition after overwriting physicalscan schema
Expand All @@ -2345,6 +2347,10 @@ func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.Physical
ts.SetSchema(tmpSchema)
}
ts.ByItems = byItems
if len(path.GroupedRanges) > 0 {
ts.GroupedRanges = path.GroupedRanges
ts.GroupByColIdxs = path.GroupByColIdxs
}
}
if len(ts.FilterCondition) > 0 {
selectivity, err := cardinality.Selectivity(ds.SCtx(), ds.TableStats.HistColl, ts.FilterCondition, nil)
Expand Down
9 changes: 6 additions & 3 deletions pkg/planner/core/operator/physicalop/physical_index_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,6 @@ func GetOriginalPhysicalIndexScan(ds *logicalop.DataSource, prop *property.Physi

// ConvertToPartialIndexScan converts a DataSource to a PhysicalIndexScan for IndexMerge.
func ConvertToPartialIndexScan(ds *logicalop.DataSource, physPlanPartInfo *PhysPlanPartInfo, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (base.PhysicalPlan, []expression.Expression, error) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
is := GetOriginalPhysicalIndexScan(ds, prop, path, matchProp.Matched(), false)
// TODO: Consider using isIndexCoveringColumns() to avoid another TableRead
indexConds := path.IndexFilters
Expand All @@ -715,8 +713,13 @@ func ConvertToPartialIndexScan(ds *logicalop.DataSource, physPlanPartInfo *PhysP
is.Columns = tmpColumns
is.SetSchema(tmpSchema)
}
// Add sort items for index scan for merge-sort operation between partitions.
// Add sort items for index scan for merge-sort operation between partitions or range groups.
is.ByItems = byItems
// Copy GroupedRanges for merge sort within this partial path (for IN conditions).
if len(path.GroupedRanges) > 0 {
is.GroupedRanges = path.GroupedRanges
is.GroupByColIdxs = path.GroupByColIdxs
}
}

// Add a `Selection` for `IndexScan` with global index.
Expand Down
Loading
Loading