Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 102 additions & 50 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
Expand Down Expand Up @@ -110,7 +111,14 @@ type IndexMergeReaderExecutor struct {
// fields about accessing partition tables
partitionTableMode bool // if this IndexMerge is accessing a partition table
prunedPartitions []table.PhysicalTable // pruned partition tables need to access
partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges]

// partialWorkerKVRanges stores the pre-built kv ranges for each partial worker. This field unifies the previous
// keyRanges and partitionKeyRanges fields.
// partialWorkerKVRanges[i] is for the i-th partial path, and consists of one or multiple grouped kv ranges (which
// may come from partitions, grouped ranges from IN conditions, or both). Each group is a kvRangesWithPhysicalTblID.
// Note that IndexMergeReaderExecutor only uses this for partial index paths and doesn't rely on this field for
// partial table paths, but memIndexMergeReader needs this, so we still build this field for all partial paths.
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID // partial paths -> grouped kv ranges -> kv ranges

// All fields above are immutable.

Expand All @@ -120,7 +128,6 @@ type IndexMergeReaderExecutor struct {
finished chan struct{}

workerStarted bool
keyRanges [][]kv.KeyRange

resultCh chan *indexMergeTableTask
resultCurr *indexMergeTableTask
Expand Down Expand Up @@ -165,7 +172,6 @@ func (e *IndexMergeReaderExecutor) Table() table.Table {

// Open implements the Executor Open interface
func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
Expand All @@ -177,29 +183,10 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
return err
}

if !e.partitionTableMode {
if e.keyRanges, err = e.buildKeyRangesForTable(e.table); err != nil {
return err
}
} else {
e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes))
tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions))
for i, p := range e.prunedPartitions {
if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil {
return err
}
}
for i, idx := range e.indexes {
if idx != nil && idx.Global {
keyRange, _ := distsql.IndexRangesToKVRanges(e.sctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i])
e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()}
} else {
for _, pKeyRanges := range tmpPartitionKeyRanges {
e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i])
}
}
}
if err = e.buildPartialWorkerKVRanges(); err != nil {
return err
}

e.finished = make(chan struct{})
e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
if e.memTracker != nil {
Expand All @@ -222,8 +209,26 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
switch x := plan[0].(type) {
case *physicalop.PhysicalIndexScan:
e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens)
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
case *physicalop.PhysicalTableScan:
e.ranges[i], err = x.ResolveCorrelatedColumns()
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
default:
err = errors.Errorf("unsupported plan type %T", plan[0])
}
Expand All @@ -235,37 +240,84 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
return nil
}

func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) {
// buildPartialWorkerKVRanges builds the unified partialWorkerKVRanges for all partial workers,
// handling partition mode and grouped ranges (from IN conditions with merge sort) in a unified way.
func (e *IndexMergeReaderExecutor) buildPartialWorkerKVRanges() error {
dctx := e.Ctx().GetDistSQLCtx()
e.partialWorkerKVRanges = make([][]*kvRangesWithPhysicalTblID, len(e.partialPlans))

for i, plan := range e.partialPlans {
_, ok := plan[0].(*physicalop.PhysicalIndexScan)
if !ok {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle)
firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges)
if err != nil {
return nil, err
}
secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges)
if err != nil {
return nil, err
// Determine grouped ranges: use GroupedRanges from the physical plan if available,
// otherwise wrap the flat ranges as a single group.
var (
groupedRanges [][]*ranger.Range
isIdxScan bool
)
switch x := plan[0].(type) {
case *physicalop.PhysicalIndexScan:
isIdxScan = true
groupedRanges = x.GroupedRanges
case *physicalop.PhysicalTableScan:
groupedRanges = x.GroupedRanges
}
if len(groupedRanges) == 0 {
groupedRanges = [][]*ranger.Range{e.ranges[i]}
}

// Determine the physical tables to scan.
type tblInfo struct {
physTblID int64
isCommonHdl bool
}
tables := make([]tblInfo, 0, max(len(e.prunedPartitions), 1))
if isIdxScan && e.partitionTableMode && e.indexes[i] != nil && e.indexes[i].Global {
tables = []tblInfo{{physTblID: e.table.Meta().ID, isCommonHdl: e.table.Meta().IsCommonHandle}}
} else if e.partitionTableMode {
for _, p := range e.prunedPartitions {
tables = append(tables, tblInfo{physTblID: p.GetPhysicalID(), isCommonHdl: p.Meta().IsCommonHandle})
}
keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...)
ranges = append(ranges, keyRanges)
continue
} else {
tables = []tblInfo{{physTblID: getPhysicalTableID(e.table), isCommonHdl: e.table.Meta().IsCommonHandle}}
}
keyRange, err := distsql.IndexRangesToKVRanges(dctx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i])
if err != nil {
return nil, err

// Build kv ranges for each group × table.
for _, ranges := range groupedRanges {
for _, t := range tables {
if isIdxScan {
kvRange, err := distsql.IndexRangesToKVRanges(dctx, t.physTblID, e.indexes[i].ID, ranges)
if err != nil {
return err
}
e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: kvRange.FirstPartitionRange(),
})
} else {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, false, e.descs[i], t.isCommonHdl)
firstKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, firstPartRanges)
if err != nil {
return err
}
secondKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, secondPartRanges)
if err != nil {
return err
}
combined := append(firstKV.FirstPartitionRange(), secondKV.FirstPartitionRange()...)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-alloc for the combined too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current implementation is optimal. The compiler should handle this well.

e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: combined,
})
}
}
}
ranges = append(ranges, keyRange.FirstPartitionRange())
}
return ranges, nil
return nil
}

func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error {
exitCh := make(chan struct{})
workCh := make(chan *indexMergeTableTask, 1)
fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges))
fetchCh := make(chan *indexMergeTableTask, len(e.partialPlans))

e.startIndexMergeProcessWorker(ctx, workCh, fetchCh)

Expand Down Expand Up @@ -337,11 +389,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
e.dagPBs[workID].CollectExecutionSummaries = &collExec
}

var keyRanges [][]kv.KeyRange
if e.partitionTableMode {
keyRanges = e.partitionKeyRanges[workID]
} else {
keyRanges = [][]kv.KeyRange{e.keyRanges[workID]}
keyRanges := make([][]kv.KeyRange, 0, len(e.partialWorkerKVRanges[workID]))
for _, pkr := range e.partialWorkerKVRanges[workID] {
keyRanges = append(keyRanges, pkr.KeyRanges)
}
failpoint.Inject("startPartialIndexWorkerErr", func() error {
return errors.New("inject an error before start partialIndexWorker")
Expand Down Expand Up @@ -504,6 +554,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: ts.ByItems,
groupedRanges: ts.GroupedRanges,
groupByColIdxs: ts.GroupByColIdxs,
}

worker := &partialTableWorker{
Expand Down
27 changes: 8 additions & 19 deletions pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,8 @@ type memIndexMergeReader struct {
memReaders []memReader
isIntersection bool

// partition mode
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables
partitionMode bool // if it is accessing a partition table
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID

keepOrder bool
compareExec
Expand Down Expand Up @@ -873,9 +871,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
memReaders: memReaders,
isIntersection: indexMergeReader.isIntersection,

partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,
partitionMode: indexMergeReader.partitionTableMode,
partialWorkerKVRanges: indexMergeReader.partialWorkerKVRanges,

keepOrder: us.keepOrder,
compareExec: us.compareExec,
Expand Down Expand Up @@ -1048,19 +1045,12 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
hMap := kv.NewHandleMap()
// loop each memReaders and fill handle map
for i, reader := range m.memReaders {
// [partitionNum][rangeNum]
var readerKvRanges [][]kv.KeyRange
if m.partitionMode {
readerKvRanges = m.partitionKVRanges[i]
} else {
readerKvRanges = [][]kv.KeyRange{m.indexMergeReader.keyRanges[i]}
}
for j, kr := range readerKvRanges {
for _, pkr := range m.partialWorkerKVRanges[i] {
switch r := reader.(type) {
case *memTableReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
case *memIndexReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
default:
return nil, errors.New("memReader have to be memTableReader or memIndexReader")
}
Expand All @@ -1071,8 +1061,7 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
// Filter same row.
for _, handle := range handles {
if _, ok := handle.(kv.PartitionHandle); !ok && m.partitionMode {
pid := m.partitionTables[j].GetPhysicalID()
handle = kv.NewPartitionHandle(pid, handle)
handle = kv.NewPartitionHandle(pkr.PhysicalTableID, handle)
}
if v, ok := hMap.Get(handle); !ok {
cnt := 1
Expand Down
18 changes: 12 additions & 6 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,12 @@ func hasV0NewCollationStringHandle(ds *logicalop.DataSource) bool {
}

func matchProperty(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult {
// This function may set the two fields below for the PropMatchedNeedMergeSort case, so we reset them here to
// avoid leaving the AccessPath with an inconsistent state when there are multiple calls to matchProperty with
Comment thread
winoros marked this conversation as resolved.
// different properties.
path.GroupedRanges = nil
path.GroupByColIdxs = nil

if ds.Table.Type().IsClusterTable() && !prop.IsSortItemEmpty() {
// TableScan with cluster table can't keep order.
return property.PropNotMatched
Expand Down Expand Up @@ -1431,8 +1437,7 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if !noSortItem && matchProperty(ds, oneAccessPath, prop) != property.PropMatched {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
}
}
Expand Down Expand Up @@ -1542,8 +1547,7 @@ func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, p
return property.PropNotMatched
}
for _, partialPath := range path.PartialIndexPaths {
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if matchProperty(ds, partialPath, prop) != property.PropMatched {
if !matchProperty(ds, partialPath, prop).Matched() {
return property.PropNotMatched
}
}
Expand Down Expand Up @@ -2325,8 +2329,6 @@ func checkColinSchema(cols []*expression.Column, schema *expression.Schema) bool
}

func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (tablePlan base.PhysicalPlan) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
ts, rowCount := physicalop.GetOriginalPhysicalTableScan(ds, prop, path, matchProp.Matched())
overwritePartialTableScanSchema(ds, ts)
// remove ineffetive filter condition after overwriting physicalscan schema
Expand All @@ -2345,6 +2347,10 @@ func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.Physical
ts.SetSchema(tmpSchema)
}
ts.ByItems = byItems
if len(path.GroupedRanges) > 0 {
ts.GroupedRanges = path.GroupedRanges
ts.GroupByColIdxs = path.GroupByColIdxs
}
}
if len(ts.FilterCondition) > 0 {
selectivity, err := cardinality.Selectivity(ds.SCtx(), ds.TableStats.HistColl, ts.FilterCondition, nil)
Expand Down
9 changes: 6 additions & 3 deletions pkg/planner/core/operator/physicalop/physical_index_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,6 @@ func GetOriginalPhysicalIndexScan(ds *logicalop.DataSource, prop *property.Physi

// ConvertToPartialIndexScan converts a DataSource to a PhysicalIndexScan for IndexMerge.
func ConvertToPartialIndexScan(ds *logicalop.DataSource, physPlanPartInfo *PhysPlanPartInfo, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (base.PhysicalPlan, []expression.Expression, error) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
is := GetOriginalPhysicalIndexScan(ds, prop, path, matchProp.Matched(), false)
// TODO: Consider using isIndexCoveringColumns() to avoid another TableRead
indexConds := path.IndexFilters
Expand All @@ -715,8 +713,13 @@ func ConvertToPartialIndexScan(ds *logicalop.DataSource, physPlanPartInfo *PhysP
is.Columns = tmpColumns
is.SetSchema(tmpSchema)
}
// Add sort items for index scan for merge-sort operation between partitions.
// Add sort items for index scan for merge-sort operation between partitions or range groups.
is.ByItems = byItems
// Copy GroupedRanges for merge sort within this partial path (for IN conditions).
if len(path.GroupedRanges) > 0 {
is.GroupedRanges = path.GroupedRanges
is.GroupByColIdxs = path.GroupByColIdxs
}
}

// Add a `Selection` for `IndexScan` with global index.
Expand Down
Loading
Loading