Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/planner/cardinality/testdata/cardinality_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,8 @@
"TopN 1.00 root test.t.c, offset:0, count:1",
"└─IndexMerge 1.00 root type: union",
" ├─IndexRangeScan(Build) 510.00 cop[tikv] table:t, index:ib(b) range:[0,50], keep order:false, stats:pseudo",
" ├─IndexRangeScan(Build) 510.00 cop[tikv] table:t, index:ic(c) range:[0,50], keep order:false, stats:pseudo",
" ├─Limit(Build) 1.00 cop[tikv] offset:0, count:1",
" │ └─IndexRangeScan 510.00 cop[tikv] table:t, index:ic(c) range:[0,50], keep order:false, stats:pseudo",
" └─TopN(Probe) 1.00 cop[tikv] test.t.c, offset:0, count:1",
" └─TableRowIDScan 1017.40 cop[tikv] table:t keep order:false, stats:pseudo"
]
Expand Down
113 changes: 86 additions & 27 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,12 @@ type candidatePath struct {
// partialOrderMatch records the partial order match result for TopN optimization.
// When the matched is true, it means this path can provide partial order using prefix index.
partialOrderMatchResult property.PartialOrderMatchResult // Result of matching partial order property
indexJoinCols int // how many index columns are used in access conditions in this IndexJoin.
isFullRange bool // cached result of whether this path covers the full scan range.
eqOrInCount int // cached result of equalPredicateCount().
// sortItemsHintsSatisfied tracks which partial paths of an IndexMerge satisfy SortItemsHints.
// Length equals len(path.PartialIndexPaths). Only set when SortItems is empty and SortItemsHints is not.
sortItemsHintsSatisfied []bool
indexJoinCols int // how many index columns are used in access conditions in this IndexJoin.
isFullRange bool // cached result of whether this path covers the full scan range.
eqOrInCount int // cached result of equalPredicateCount().
}

func compareBool(l, r bool) int {
Expand Down Expand Up @@ -1410,49 +1413,85 @@ func GroupRangesByCols(ranges []*ranger.Range, groupByColIdxs []int) ([][]*range
//
// at last, according to determinedIndexPartialPaths to rewrite their real countAfterAccess, this part is move from deriveStats to
// here.
func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) (*util.AccessPath, property.PhysicalPropMatchResult) {
func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) (*util.AccessPath, []bool, property.PhysicalPropMatchResult) {
// target:
// 1: index merge case, try to match the every alternative partial path to the order property as long as
// possible, and generate that property-matched index merge path out if any.
// 2: If the prop is empty (means no sort requirement), we will generate a random index partial combination
// path from all alternatives in case that no index merge path comes out.
// 2: If the prop is empty (means no sort requirement) but SortItemsHints is set, prefer alternatives
// that can satisfy the SoftSortItems for potential Limit pushdown.
// 3: If neither, generate a random index partial combination path from all alternatives.

// Execution part doesn't support the merge operation for intersection case yet.
if path.IndexMergeIsIntersection {
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

noSortItem := prop.IsSortItemEmpty()
allSame, _ := prop.AllSameOrder()
if !allSame {
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

// When SortItems is empty and SortItemsHints is set, use hints as soft sort
// requirements. Alternatives that satisfy SortItemsHints can benefit from
// Limit pushdown.
useSortItemsHints := noSortItem && len(prop.SortItemsHints) > 0
var hintsProp *property.PhysicalProperty
if useSortItemsHints {
hintsProp = prop.CloneEssentialFields()
hintsProp.SortItems = hintsProp.SortItemsHints
}

// step1: match the property from all the index partial alternative paths.
determinedIndexPartialPaths := make([]*util.AccessPath, 0, len(path.PartialAlternativeIndexPaths))
usedIndexMap := make(map[int64]struct{}, 1)
useMVIndex := false
sortItemsHintsSatisfied := make([]bool, 0, len(path.PartialAlternativeIndexPaths))
for _, oneORBranch := range path.PartialAlternativeIndexPaths {
matchIdxes := make([]int, 0, 1)
for i, oneAlternative := range oneORBranch {
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
branchSatisfiesHints := false

if useSortItemsHints {
// First pass: prefer alternatives that satisfy SortItemsHints.
for i, oneAlternative := range oneORBranch {
match := true
for _, oneAccessPath := range oneAlternative {
if !matchProperty(ds, oneAccessPath, hintsProp).Matched() {
match = false
}
}
if match {
matchIdxes = append(matchIdxes, i)
}
}
if !match {
continue
if len(matchIdxes) > 0 {
branchSatisfiesHints = true
}
}

// If no matches found with hints, fall back to default matching logic.
if len(matchIdxes) == 0 {
Comment thread
qw4990 marked this conversation as resolved.
for i, oneAlternative := range oneORBranch {
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
}
}
if !match {
continue
}
// two possibility here:
// 1. no sort items requirement.
// 2. matched with sorted items.
matchIdxes = append(matchIdxes, i)
}
// two possibility here:
// 1. no sort items requirement.
// 2. matched with sorted items.
matchIdxes = append(matchIdxes, i)
}
if len(matchIdxes) == 0 {
// if all index alternative of one of the cnf item's couldn't match the sort property,
// the entire index merge union path can be ignored for this sort property, return false.
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}
if len(matchIdxes) > 1 {
// if matchIdxes greater than 1, we should sort this match alternative path by its CountAfterAccess.
Expand Down Expand Up @@ -1488,12 +1527,13 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
}
// record the lowestCountAfterAccessIdx's chosen index.
usedIndexMap[indexID] = struct{}{}
sortItemsHintsSatisfied = append(sortItemsHintsSatisfied, branchSatisfiesHints)
}
// since all the choice is done, check the all single index limitation, skip check for mv index.
// since ds index merge hints will prune other path ahead, lift the all single index limitation here.
if len(usedIndexMap) == 1 && !useMVIndex && len(ds.IndexMergeHints) <= 0 {
// if all partial path are using a same index, meaningless and fail over.
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

// check if any of the partial paths is not cacheable.
Expand Down Expand Up @@ -1532,9 +1572,9 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
if noSortItem {
// since there is no sort property, index merge case is generated by random combination, each alternative with the lower/lowest
// countAfterAccess, here the returned matchProperty should be PropNotMatched.
return indexMergePath, property.PropNotMatched
return indexMergePath, sortItemsHintsSatisfied, property.PropNotMatched
}
return indexMergePath, property.PropMatched
return indexMergePath, sortItemsHintsSatisfied, property.PropMatched
}

func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult {
Expand Down Expand Up @@ -1598,12 +1638,15 @@ func getIndexCandidateForIndexJoin(sctx planctx.PlanContext, path *util.AccessPa
}

func convergeIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
// since the all index path alternative paths is collected and undetermined, and we should determine a possible and concrete path for this prop.
possiblePath, match := matchPropForIndexMergeAlternatives(ds, path, prop)
possiblePath, sortHintsSatisfied, match := matchPropForIndexMergeAlternatives(ds, path, prop)
if possiblePath == nil {
return nil
}
candidate := &candidatePath{path: possiblePath, matchPropResult: match}
candidate := &candidatePath{
path: possiblePath,
matchPropResult: match,
sortItemsHintsSatisfied: sortHintsSatisfied,
}
candidate.isFullRange = possiblePath.IsFullScanRange(ds.TableInfo)
candidate.eqOrInCount = candidate.equalPredicateCount()
return candidate
Expand All @@ -1612,6 +1655,21 @@ func convergeIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath
func getIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
candidate := &candidatePath{path: path}
candidate.matchPropResult = isMatchPropForIndexMerge(ds, path, prop)

allSameOrder, _ := prop.AllSameOrder()
// When SortItems is empty and SortItemsHints is set, check which partial
// paths satisfy the hints (for Limit pushdown).
if prop.IsSortItemEmpty() && len(prop.SortItemsHints) > 0 && !path.IndexMergeIsIntersection && allSameOrder {
hintsProp := prop.CloneEssentialFields()
hintsProp.SortItems = hintsProp.SortItemsHints
candidate.sortItemsHintsSatisfied = make([]bool, 0, len(path.PartialIndexPaths))
for _, partialPath := range path.PartialIndexPaths {
satisfied := matchProperty(ds, partialPath, hintsProp).Matched()
candidate.sortItemsHintsSatisfied = append(
candidate.sortItemsHintsSatisfied, satisfied)
}
}

candidate.isFullRange = path.IsFullScanRange(ds.TableInfo)
candidate.eqOrInCount = candidate.equalPredicateCount()
return candidate
Expand Down Expand Up @@ -2271,6 +2329,7 @@ func convertToIndexMergeScan(ds *logicalop.DataSource, prop *property.PhysicalPr
cop.IdxMergePartPlans = scans
cop.IdxMergeIsIntersection = path.IndexMergeIsIntersection
cop.IdxMergeAccessMVIndex = path.IndexMergeAccessMVIndex
cop.IdxMergePartPlansSatisfySortHints = candidate.sortItemsHintsSatisfied
if len(globalRemainingFilters) != 0 {
cop.RootTaskConds = globalRemainingFilters
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/planner/core/operator/physicalop/physical_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,25 @@ func getPhysTopN(lt *logicalop.LogicalTopN, prop *property.PhysicalProperty) []b
}
ret := make([]base.PhysicalPlan, 0, len(allTaskTypes)+1)

// When TopN is directly above a DataSource, set SortItemsHints so that
// IndexMerge can prefer partial paths that satisfy the ORDER BY.
// This enables pushing Limit to ordered partial paths.
var sortItemsHints []property.SortItem
Comment thread
qw4990 marked this conversation as resolved.
Outdated
if _, ok := lt.Children()[0].(*logicalop.DataSource); ok && len(lt.ByItems) > 0 {
sortItemsHints = make([]property.SortItem, 0, len(lt.ByItems))
for _, byItem := range lt.ByItems {
col, ok := byItem.Expr.(*expression.Column)
if !ok {
sortItemsHints = nil
break
}
sortItemsHints = append(sortItemsHints, property.SortItem{
Col: col,
Desc: byItem.Desc,
})
}
}

// Generate candidate plans for partial order optimization using prefix index FIRST.
// This is important because when use_index hint is used with a prefix index,
// we need to set ForcePartialOrder flag before other candidates are evaluated.
Expand All @@ -290,7 +309,8 @@ func getPhysTopN(lt *logicalop.LogicalTopN, prop *property.PhysicalProperty) []b

for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64,
CTEProducerStatus: prop.CTEProducerStatus, NoCopPushDown: prop.NoCopPushDown}
CTEProducerStatus: prop.CTEProducerStatus, NoCopPushDown: prop.NoCopPushDown,
SortItemsHints: sortItemsHints}
topN := PhysicalTopN{
ByItems: lt.ByItems,
PartitionBy: lt.PartitionBy,
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/operator/physicalop/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ type CopTask struct {
// Set by convertToIndexScan when a prefix index provides partial order for TopN.
PartialOrderMatchResult *property.PartialOrderMatchResult

// IdxMergePartPlansSatisfySortHints tracks which partial paths of an IndexMerge
// satisfy SortItemsHints. Length equals len(IdxMergePartPlans).
// Set by convertToIndexMergeScan when SortItemsHints is used.
IdxMergePartPlansSatisfySortHints []bool

// Warnings passed through different task copy attached with more upper operator specific Warnings. (not concurrent safe)
Warnings SimpleWarnings
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,22 @@ func attach2Task4PhysicalTopN(pp base.PhysicalPlan, tasks ...base.Task) base.Tas
}
}
if copTask, ok := t.(*physicalop.CopTask); ok && needPushDown && canPushDownToTiKV(p, copTask) && len(copTask.RootTaskConds) == 0 {
// Handle IndexMerge with SortItemsHints when some (but not all) partial
// paths satisfy the sort hints. When all paths satisfy, the existing
// Limit pushdown via attach2Task4PhysicalLimit gives a better plan.
if len(copTask.IdxMergePartPlans) > 0 && !copTask.IndexPlanFinished && !copTask.IdxMergeIsIntersection &&
len(copTask.IdxMergePartPlansSatisfySortHints) > 0 {
allSatisfy := true
for _, s := range copTask.IdxMergePartPlansSatisfySortHints {
if !s {
allSatisfy = false
break
}
}
if !allSatisfy {
return handleSortItemsHintsForIndexMerge(p, copTask)
}
}
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
var pushedDownTopN *physicalop.PhysicalTopN
Expand Down Expand Up @@ -1435,6 +1451,55 @@ func estimateMaxXForPartialOrder() uint64 {
return 0
}

// handleSortItemsHintsForIndexMerge handles TopN pushdown when IndexMerge has
// SortItemsHints satisfaction info. It pushes Limit to partial paths that
// satisfy the sort order and TopN to those that don't, then keeps a root TopN
// for final merge.
func handleSortItemsHintsForIndexMerge(p *physicalop.PhysicalTopN, copTask *physicalop.CopTask) base.Task {
newCount := p.Offset + p.Count

cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy()))
for _, expr := range p.GetPartitionBy() {
newPartitionBy = append(newPartitionBy, expr.Clone())
}

for i, partialPlan := range copTask.IdxMergePartPlans {
if copTask.IdxMergePartPlansSatisfySortHints[i] {
// This partial path satisfies the sort order, push Limit.
childProfile := partialPlan.StatsInfo()
stats := property.DeriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := physicalop.PhysicalLimit{
Count: newCount,
PartitionBy: newPartitionBy,
}.Init(p.SCtx(), stats, p.QueryBlockOffset())
pushedDownLimit.SetChildren(partialPlan)
pushedDownLimit.SetSchema(partialPlan.Schema())
copTask.IdxMergePartPlans[i] = pushedDownLimit
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else if canPushToIndexPlan(partialPlan, cols) {
// This partial path does not satisfy the sort order, push TopN.
pushedDownTopN, _ := getPushedDownTopN(p, partialPlan, copTask.GetStoreType())
copTask.IdxMergePartPlans[i] = pushedDownTopN
}
}

// Push TopN to the table plan side if it exists.
if copTask.TablePlan != nil {
pushedDownTopN, _ := getPushedDownTopN(p, copTask.TablePlan, copTask.GetStoreType())
copTask.TablePlan = pushedDownTopN
}

// Keep the root TopN as the final merge layer.
rootTask := copTask.ConvertToRootTask(p.SCtx())
if len(p.GetPartitionBy()) > 0 {
return rootTask
}
return attachPlan2Task(p, rootTask)
}

// attach2Task4PhysicalProjection implements PhysicalPlan interface.
func attach2Task4PhysicalProjection(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
p := pp.(*physicalop.PhysicalProjection)
Expand Down
22 changes: 21 additions & 1 deletion pkg/planner/property/physical_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ type PhysicalProperty struct {
// The partialOrderInfo property will pass through to the datasource and try to matchPartialOrderProperty such as:
// index: (a, b(10) )
PartialOrderInfo *PartialOrderInfo

// SortItemsHints contains sort items that are preferred but not required.
// When SortItems is empty and SortItemsHints is not, DataSource can try to
// generate paths that satisfy these sort items, enabling Limit pushdown to
// partial paths of IndexMerge.
// Currently only set when TopN is directly above a DataSource.
SortItemsHints []SortItem
}

// PartialOrderInfo records information needed for partial order optimization.
Expand Down Expand Up @@ -579,7 +586,7 @@ func (p *PhysicalProperty) HashCode() []byte {
if p.hashcode != nil {
return p.hashcode
}
hashcodeSize := 8 + 8 + 8 + (16+8)*len(p.SortItems) + 8
hashcodeSize := 8 + 8 + 8 + (16+8)*len(p.SortItems) + 8 + (16+8)*len(p.SortItemsHints) + 8
if p.PartialOrderInfo != nil {
hashcodeSize += (16 + 8) * len(p.PartialOrderInfo.SortItems)
} else {
Expand Down Expand Up @@ -652,6 +659,15 @@ func (p *PhysicalProperty) HashCode() []byte {
} else {
p.hashcode = codec.EncodeInt(p.hashcode, 0)
}
// encode SortItemsHints into physical prop's hashcode.
for _, item := range p.SortItemsHints {
p.hashcode = append(p.hashcode, item.Col.HashCode()...)
if item.Desc {
p.hashcode = codec.EncodeInt(p.hashcode, 1)
} else {
p.hashcode = codec.EncodeInt(p.hashcode, 0)
}
}
return p.hashcode
}

Expand All @@ -673,6 +689,7 @@ func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty {
CTEProducerStatus: p.CTEProducerStatus,
NoCopPushDown: p.NoCopPushDown,
PartialOrderInfo: p.PartialOrderInfo, // Copy PartialOrderInfo for TopN partial order optimization
SortItemsHints: p.SortItemsHints,
// we default not to clone basic indexJoinProp by default.
// and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators.
}
Expand Down Expand Up @@ -710,6 +727,9 @@ func (p *PhysicalProperty) MemoryUsage() (sum int64) {
for _, mppCol := range p.MPPPartitionCols {
sum += mppCol.MemoryUsage()
}
for _, sortItem := range p.SortItemsHints {
sum += sortItem.MemoryUsage()
}
return
}

Expand Down
Loading
Loading