Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 101 additions & 50 deletions pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ type IndexMergeReaderExecutor struct {
// fields about accessing partition tables
partitionTableMode bool // if this IndexMerge is accessing a partition table
prunedPartitions []table.PhysicalTable // pruned partition tables need to access
partitionKeyRanges [][][]kv.KeyRange // [partialIndex][partitionIdx][ranges]

// partialWorkerKVRanges stores the pre-built kv ranges for each partial worker. This field unifies the previous
// keyRanges and partitionKeyRanges fields.
// partialWorkerKVRanges[i] is for the i-th partial path, and consists of one or multiple grouped kv ranges (which
// may come from partitions, grouped ranges from IN conditions, or both). Each group is a kvRangesWithPhysicalTblID.
// Note that IndexMergeReaderExecutor only uses this for partial index paths and doesn't rely on this field for
// partial table paths, but memIndexMergeReader needs this, so we still build this field for all partial paths.
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID // partial paths -> grouped kv ranges -> kv ranges

// All fields above are immutable.

Expand All @@ -120,7 +127,6 @@ type IndexMergeReaderExecutor struct {
finished chan struct{}

workerStarted bool
keyRanges [][]kv.KeyRange

resultCh chan *indexMergeTableTask
resultCurr *indexMergeTableTask
Expand Down Expand Up @@ -165,7 +171,6 @@ func (e *IndexMergeReaderExecutor) Table() table.Table {

// Open implements the Executor Open interface
func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetBuildPBCtx(), e.tblPlans)
Expand All @@ -177,29 +182,10 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
return err
}

if !e.partitionTableMode {
if e.keyRanges, err = e.buildKeyRangesForTable(e.table); err != nil {
return err
}
} else {
e.partitionKeyRanges = make([][][]kv.KeyRange, len(e.indexes))
tmpPartitionKeyRanges := make([][][]kv.KeyRange, len(e.prunedPartitions))
for i, p := range e.prunedPartitions {
if tmpPartitionKeyRanges[i], err = e.buildKeyRangesForTable(p); err != nil {
return err
}
}
for i, idx := range e.indexes {
if idx != nil && idx.Global {
keyRange, _ := distsql.IndexRangesToKVRanges(e.sctx.GetDistSQLCtx(), e.table.Meta().ID, idx.ID, e.ranges[i])
e.partitionKeyRanges[i] = [][]kv.KeyRange{keyRange.FirstPartitionRange()}
} else {
for _, pKeyRanges := range tmpPartitionKeyRanges {
e.partitionKeyRanges[i] = append(e.partitionKeyRanges[i], pKeyRanges[i])
}
}
}
if err = e.buildPartialWorkerKVRanges(); err != nil {
return err
}

e.finished = make(chan struct{})
e.resultCh = make(chan *indexMergeTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
if e.memTracker != nil {
Expand All @@ -222,8 +208,26 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
switch x := plan[0].(type) {
case *plannercore.PhysicalIndexScan:
e.ranges[i], err = rebuildIndexRanges(e.Ctx().GetExprCtx(), e.Ctx().GetRangerCtx(), x, x.IdxCols, x.IdxColLens)
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
case *plannercore.PhysicalTableScan:
e.ranges[i], err = x.ResolveCorrelatedColumns()
if err != nil {
return err
}
if len(x.GroupByColIdxs) > 0 {
x.GroupedRanges, err = plannercore.GroupRangesByCols(e.ranges[i], x.GroupByColIdxs)
if err != nil {
return err
}
}
default:
err = errors.Errorf("unsupported plan type %T", plan[0])
}
Expand All @@ -235,37 +239,84 @@ func (e *IndexMergeReaderExecutor) rebuildRangeForCorCol() (err error) {
return nil
}

func (e *IndexMergeReaderExecutor) buildKeyRangesForTable(tbl table.Table) (ranges [][]kv.KeyRange, err error) {
// buildPartialWorkerKVRanges builds the unified partialWorkerKVRanges for all partial workers,
// handling partition mode and grouped ranges (from IN conditions with merge sort) in a unified way.
func (e *IndexMergeReaderExecutor) buildPartialWorkerKVRanges() error {
dctx := e.Ctx().GetDistSQLCtx()
e.partialWorkerKVRanges = make([][]*kvRangesWithPhysicalTblID, len(e.partialPlans))

for i, plan := range e.partialPlans {
_, ok := plan[0].(*plannercore.PhysicalIndexScan)
if !ok {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(e.ranges[i], false, e.descs[i], tbl.Meta().IsCommonHandle)
firstKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, firstPartRanges)
if err != nil {
return nil, err
}
secondKeyRanges, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{getPhysicalTableID(tbl)}, tbl.Meta().IsCommonHandle, secondPartRanges)
if err != nil {
return nil, err
// Determine grouped ranges: use GroupedRanges from the physical plan if available,
// otherwise wrap the flat ranges as a single group.
var (
groupedRanges [][]*ranger.Range
isIdxScan bool
)
switch x := plan[0].(type) {
case *plannercore.PhysicalIndexScan:
isIdxScan = true
groupedRanges = x.GroupedRanges
case *plannercore.PhysicalTableScan:
groupedRanges = x.GroupedRanges
}
if len(groupedRanges) == 0 {
groupedRanges = [][]*ranger.Range{e.ranges[i]}
}

// Determine the physical tables to scan.
type tblInfo struct {
physTblID int64
isCommonHdl bool
}
tables := make([]tblInfo, 0, max(len(e.prunedPartitions), 1))
if isIdxScan && e.partitionTableMode && e.indexes[i] != nil && e.indexes[i].Global {
tables = []tblInfo{{physTblID: e.table.Meta().ID, isCommonHdl: e.table.Meta().IsCommonHandle}}
} else if e.partitionTableMode {
for _, p := range e.prunedPartitions {
tables = append(tables, tblInfo{physTblID: p.GetPhysicalID(), isCommonHdl: p.Meta().IsCommonHandle})
}
keyRanges := append(firstKeyRanges.FirstPartitionRange(), secondKeyRanges.FirstPartitionRange()...)
ranges = append(ranges, keyRanges)
continue
} else {
tables = []tblInfo{{physTblID: getPhysicalTableID(e.table), isCommonHdl: e.table.Meta().IsCommonHandle}}
}
keyRange, err := distsql.IndexRangesToKVRanges(dctx, getPhysicalTableID(tbl), e.indexes[i].ID, e.ranges[i])
if err != nil {
return nil, err

// Build kv ranges for each group × table.
for _, ranges := range groupedRanges {
for _, t := range tables {
if isIdxScan {
kvRange, err := distsql.IndexRangesToKVRanges(dctx, t.physTblID, e.indexes[i].ID, ranges)
if err != nil {
return err
}
e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: kvRange.FirstPartitionRange(),
})
} else {
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, false, e.descs[i], t.isCommonHdl)
firstKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, firstPartRanges)
if err != nil {
return err
}
secondKV, err := distsql.TableHandleRangesToKVRanges(dctx, []int64{t.physTblID}, t.isCommonHdl, secondPartRanges)
if err != nil {
return err
}
combined := append(firstKV.FirstPartitionRange(), secondKV.FirstPartitionRange()...)
e.partialWorkerKVRanges[i] = append(e.partialWorkerKVRanges[i], &kvRangesWithPhysicalTblID{
PhysicalTableID: t.physTblID,
KeyRanges: combined,
})
}
}
}
ranges = append(ranges, keyRange.FirstPartitionRange())
}
return ranges, nil
return nil
}

func (e *IndexMergeReaderExecutor) startWorkers(ctx context.Context) error {
exitCh := make(chan struct{})
workCh := make(chan *indexMergeTableTask, 1)
fetchCh := make(chan *indexMergeTableTask, len(e.keyRanges))
fetchCh := make(chan *indexMergeTableTask, len(e.partialPlans))

e.startIndexMergeProcessWorker(ctx, workCh, fetchCh)

Expand Down Expand Up @@ -337,11 +388,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
e.dagPBs[workID].CollectExecutionSummaries = &collExec
}

var keyRanges [][]kv.KeyRange
if e.partitionTableMode {
keyRanges = e.partitionKeyRanges[workID]
} else {
keyRanges = [][]kv.KeyRange{e.keyRanges[workID]}
keyRanges := make([][]kv.KeyRange, 0, len(e.partialWorkerKVRanges[workID]))
for _, pkr := range e.partialWorkerKVRanges[workID] {
keyRanges = append(keyRanges, pkr.KeyRanges)
}
failpoint.Inject("startPartialIndexWorkerErr", func() error {
return errors.New("inject an error before start partialIndexWorker")
Expand Down Expand Up @@ -501,6 +550,8 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: ts.ByItems,
groupedRanges: ts.GroupedRanges,
groupByColIdxs: ts.GroupByColIdxs,
}

worker := &partialTableWorker{
Expand Down
27 changes: 8 additions & 19 deletions pkg/executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,10 +816,8 @@ type memIndexMergeReader struct {
memReaders []memReader
isIntersection bool

// partition mode
partitionMode bool // if it is accessing a partition table
partitionTables []table.PhysicalTable // partition tables to access
partitionKVRanges [][][]kv.KeyRange // kv ranges for these partition tables
partitionMode bool // if it is accessing a partition table
partialWorkerKVRanges [][]*kvRangesWithPhysicalTblID

keepOrder bool
compareExec
Expand Down Expand Up @@ -873,9 +871,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge
memReaders: memReaders,
isIntersection: indexMergeReader.isIntersection,

partitionMode: indexMergeReader.partitionTableMode,
partitionTables: indexMergeReader.prunedPartitions,
partitionKVRanges: indexMergeReader.partitionKeyRanges,
partitionMode: indexMergeReader.partitionTableMode,
partialWorkerKVRanges: indexMergeReader.partialWorkerKVRanges,

keepOrder: us.keepOrder,
compareExec: us.compareExec,
Expand Down Expand Up @@ -1048,19 +1045,12 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
hMap := kv.NewHandleMap()
// loop each memReaders and fill handle map
for i, reader := range m.memReaders {
// [partitionNum][rangeNum]
var readerKvRanges [][]kv.KeyRange
if m.partitionMode {
readerKvRanges = m.partitionKVRanges[i]
} else {
readerKvRanges = [][]kv.KeyRange{m.indexMergeReader.keyRanges[i]}
}
for j, kr := range readerKvRanges {
for _, pkr := range m.partialWorkerKVRanges[i] {
switch r := reader.(type) {
case *memTableReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
case *memIndexReader:
r.kvRanges = kr
r.kvRanges = pkr.KeyRanges
default:
return nil, errors.New("memReader have to be memTableReader or memIndexReader")
}
Expand All @@ -1071,8 +1061,7 @@ func (m *memIndexMergeReader) getHandles() (handles []kv.Handle, err error) {
// Filter same row.
for _, handle := range handles {
if _, ok := handle.(kv.PartitionHandle); !ok && m.partitionMode {
pid := m.partitionTables[j].GetPhysicalID()
handle = kv.NewPartitionHandle(pid, handle)
handle = kv.NewPartitionHandle(pkr.PhysicalTableID, handle)
}
if v, ok := hMap.Get(handle); !ok {
cnt := 1
Expand Down
27 changes: 18 additions & 9 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,12 @@ func isFullIndexMatch(candidate *candidatePath) bool {
}

func matchProperty(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult {
// This function may set the two fields below for the PropMatchedNeedMergeSort case, so we reset them here to
// avoid leaving the AccessPath with an inconsistent state when there are multiple calls to matchProperty with
// different properties.
path.GroupedRanges = nil
path.GroupByColIdxs = nil

if ds.Table.Type().IsClusterTable() && !prop.IsSortItemEmpty() {
// TableScan with cluster table can't keep order.
return property.PropNotMatched
Expand Down Expand Up @@ -1110,8 +1116,7 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
matchIdxes := make([]int, 0, 1)
for i, oneAlternative := range oneORBranch {
// if there is some sort items and this path doesn't match this prop, continue.
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if !noSortItem && matchProperty(ds, oneAlternative, prop) != property.PropMatched {
if !noSortItem && !matchProperty(ds, oneAlternative, prop).Matched() {
continue
}
// two possibility here:
Expand Down Expand Up @@ -1238,8 +1243,7 @@ func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, p
return property.PropNotMatched
}
for _, partialPath := range path.PartialIndexPaths {
// Satisfying the property by a merge sort is not supported for partial paths of index merge.
if matchProperty(ds, partialPath, prop) != property.PropMatched {
if !matchProperty(ds, partialPath, prop).Matched() {
return property.PropNotMatched
}
}
Expand Down Expand Up @@ -1960,17 +1964,20 @@ func convertToIndexMergeScan(ds *logicalop.DataSource, prop *property.PhysicalPr
}

func convertToPartialIndexScan(ds *logicalop.DataSource, physPlanPartInfo *PhysPlanPartInfo, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (base.PhysicalPlan, []expression.Expression, error) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
is := getOriginalPhysicalIndexScan(ds, prop, path, matchProp.Matched(), false)
// TODO: Consider using isIndexCoveringColumns() to avoid another TableRead
indexConds := path.IndexFilters
if matchProp.Matched() {
if is.Table.GetPartitionInfo() != nil && !is.Index.Global && is.SCtx().GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
is.Columns, is.schema, _ = AddExtraPhysTblIDColumn(is.SCtx(), is.Columns, is.schema)
}
// Add sort items for index scan for merge-sort operation between partitions.
// Add sort items for index scan for merge-sort operation between partitions or range groups.
is.ByItems = byItems
// Copy GroupedRanges for merge sort within this partial path (for IN conditions).
if len(path.GroupedRanges) > 0 {
is.GroupedRanges = path.GroupedRanges
is.GroupByColIdxs = path.GroupByColIdxs
}
}

// Add a `Selection` for `IndexScan` with global index.
Expand Down Expand Up @@ -2009,8 +2016,6 @@ func checkColinSchema(cols []*expression.Column, schema *expression.Schema) bool
}

func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.PhysicalProperty, path *util.AccessPath, matchProp property.PhysicalPropMatchResult, byItems []*util.ByItems) (tablePlan base.PhysicalPlan) {
intest.Assert(matchProp != property.PropMatchedNeedMergeSort,
"partial paths of index merge path should not match property using merge sort")
ts, rowCount := getOriginalPhysicalTableScan(ds, prop, path, matchProp.Matched())
overwritePartialTableScanSchema(ds, ts)
// remove ineffetive filter condition after overwriting physicalscan schema
Expand All @@ -2027,6 +2032,10 @@ func convertToPartialTableScan(ds *logicalop.DataSource, prop *property.Physical
ts.Columns, ts.schema, _ = AddExtraPhysTblIDColumn(ts.SCtx(), ts.Columns, ts.schema)
}
ts.ByItems = byItems
if len(path.GroupedRanges) > 0 {
ts.GroupedRanges = path.GroupedRanges
ts.GroupByColIdxs = path.GroupByColIdxs
}
}
if len(ts.filterCondition) > 0 {
selectivity, _, err := cardinality.Selectivity(ds.SCtx(), ds.TableStats.HistColl, ts.filterCondition, nil)
Expand Down
Loading