Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 88 additions & 30 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,9 +773,12 @@ type candidatePath struct {
// partialOrderMatch records the partial order match result for TopN optimization.
// When the matched is true, it means this path can provide partial order using prefix index.
partialOrderMatchResult property.PartialOrderMatchResult // Result of matching partial order property
indexJoinCols int // how many index columns are used in access conditions in this IndexJoin.
isFullRange bool // cached result of whether this path covers the full scan range.
eqOrInCount int // cached result of equalPredicateCount().
// sortItemsHintsSatisfied tracks which partial paths of an IndexMerge satisfy SortItemsHints.
// Length equals len(path.PartialIndexPaths). Only set when SortItems is empty and SortItemsHints is not.
sortItemsHintsSatisfied []bool
indexJoinCols int // how many index columns are used in access conditions in this IndexJoin.
isFullRange bool // cached result of whether this path covers the full scan range.
eqOrInCount int // cached result of equalPredicateCount().
}

func compareBool(l, r bool) int {
Expand Down Expand Up @@ -1410,49 +1413,85 @@ func GroupRangesByCols(ranges []*ranger.Range, groupByColIdxs []int) ([][]*range
//
// at last, according to determinedIndexPartialPaths to rewrite their real countAfterAccess, this part is move from deriveStats to
// here.
func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) (*util.AccessPath, property.PhysicalPropMatchResult) {
func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) (*util.AccessPath, []bool, property.PhysicalPropMatchResult) {
// target:
// 1: index merge case, try to match the every alternative partial path to the order property as long as
// possible, and generate that property-matched index merge path out if any.
// 2: If the prop is empty (means no sort requirement), we will generate a random index partial combination
// path from all alternatives in case that no index merge path comes out.
// 2: If the prop is empty (means no sort requirement) but SortItemsHints is set, prefer alternatives
// that can satisfy the SoftSortItems for potential Limit pushdown.
// 3: If neither, generate a random index partial combination path from all alternatives.

// Execution part doesn't support the merge operation for intersection case yet.
if path.IndexMergeIsIntersection {
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

noSortItem := prop.IsSortItemEmpty()
allSame, _ := prop.AllSameOrder()
if !allSame {
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

// When SortItems is empty and SortItemsHints is set, use hints as soft sort
// requirements. Alternatives that satisfy SortItemsHints can benefit from
// Limit pushdown.
useSortItemsHints := noSortItem && len(prop.SortItemsHints) > 0
var hintsProp *property.PhysicalProperty
if useSortItemsHints {
hintsProp = prop.CloneEssentialFields()
hintsProp.SortItems = hintsProp.SortItemsHints
}

// step1: match the property from all the index partial alternative paths.
determinedIndexPartialPaths := make([]*util.AccessPath, 0, len(path.PartialAlternativeIndexPaths))
usedIndexMap := make(map[int64]struct{}, 1)
useMVIndex := false
sortItemsHintsSatisfied := make([]bool, 0, len(path.PartialAlternativeIndexPaths))
for _, oneORBranch := range path.PartialAlternativeIndexPaths {
matchIdxes := make([]int, 0, 1)
for i, oneAlternative := range oneORBranch {
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
branchSatisfiesHints := false

if useSortItemsHints {
// First pass: prefer alternatives that satisfy SortItemsHints.
for i, oneAlternative := range oneORBranch {
match := true
for _, oneAccessPath := range oneAlternative {
if !matchProperty(ds, oneAccessPath, hintsProp).Matched() {
match = false
}
}
if match {
matchIdxes = append(matchIdxes, i)
}
}
if !match {
continue
if len(matchIdxes) > 0 {
branchSatisfiesHints = true
}
}

// If no matches found with hints, fall back to default matching logic.
if len(matchIdxes) == 0 {
Comment thread
qw4990 marked this conversation as resolved.
for i, oneAlternative := range oneORBranch {
// if there is some sort items and this path doesn't match this prop, continue.
match := true
for _, oneAccessPath := range oneAlternative {
if !noSortItem && !matchProperty(ds, oneAccessPath, prop).Matched() {
match = false
}
}
if !match {
continue
}
// two possibility here:
// 1. no sort items requirement.
// 2. matched with sorted items.
matchIdxes = append(matchIdxes, i)
}
// two possibility here:
// 1. no sort items requirement.
// 2. matched with sorted items.
matchIdxes = append(matchIdxes, i)
}
if len(matchIdxes) == 0 {
// if all index alternative of one of the cnf item's couldn't match the sort property,
// the entire index merge union path can be ignored for this sort property, return false.
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}
if len(matchIdxes) > 1 {
// if matchIdxes greater than 1, we should sort this match alternative path by its CountAfterAccess.
Expand Down Expand Up @@ -1488,12 +1527,13 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
}
// record the lowestCountAfterAccessIdx's chosen index.
usedIndexMap[indexID] = struct{}{}
sortItemsHintsSatisfied = append(sortItemsHintsSatisfied, branchSatisfiesHints)
}
// since all the choice is done, check the all single index limitation, skip check for mv index.
// since ds index merge hints will prune other path ahead, lift the all single index limitation here.
if len(usedIndexMap) == 1 && !useMVIndex && len(ds.IndexMergeHints) <= 0 {
// if all partial path are using a same index, meaningless and fail over.
return nil, property.PropNotMatched
return nil, nil, property.PropNotMatched
}

// check if any of the partial paths is not cacheable.
Expand All @@ -1511,8 +1551,8 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
PartialIndexPaths: determinedIndexPartialPaths,
IndexMergeIsIntersection: false,
// inherit those determined can't pushed-down table filters.
TableFilters: path.TableFilters,
NoncacheableReason: notCachableReason,
TableFilters: path.TableFilters,
NoncacheableReason: notCachableReason,
}
// path.ShouldBeKeptCurrentFilter record that whether there are some part of the cnf item couldn't be pushed down to tikv already.
shouldKeepCurrentFilter := path.KeepIndexMergeORSourceFilter
Expand All @@ -1532,9 +1572,9 @@ func matchPropForIndexMergeAlternatives(ds *logicalop.DataSource, path *util.Acc
if noSortItem {
// since there is no sort property, index merge case is generated by random combination, each alternative with the lower/lowest
// countAfterAccess, here the returned matchProperty should be PropNotMatched.
return indexMergePath, property.PropNotMatched
return indexMergePath, sortItemsHintsSatisfied, property.PropNotMatched
}
return indexMergePath, property.PropMatched
return indexMergePath, sortItemsHintsSatisfied, property.PropMatched
}

func isMatchPropForIndexMerge(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) property.PhysicalPropMatchResult {
Expand Down Expand Up @@ -1598,12 +1638,15 @@ func getIndexCandidateForIndexJoin(sctx planctx.PlanContext, path *util.AccessPa
}

func convergeIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
// since the all index path alternative paths is collected and undetermined, and we should determine a possible and concrete path for this prop.
possiblePath, match := matchPropForIndexMergeAlternatives(ds, path, prop)
possiblePath, sortHintsSatisfied, match := matchPropForIndexMergeAlternatives(ds, path, prop)
if possiblePath == nil {
return nil
}
candidate := &candidatePath{path: possiblePath, matchPropResult: match}
candidate := &candidatePath{
path: possiblePath,
matchPropResult: match,
sortItemsHintsSatisfied: sortHintsSatisfied,
}
candidate.isFullRange = possiblePath.IsFullScanRange(ds.TableInfo)
candidate.eqOrInCount = candidate.equalPredicateCount()
return candidate
Expand All @@ -1612,6 +1655,20 @@ func convergeIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath
func getIndexMergeCandidate(ds *logicalop.DataSource, path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
candidate := &candidatePath{path: path}
candidate.matchPropResult = isMatchPropForIndexMerge(ds, path, prop)

// When SortItems is empty and SortItemsHints is set, check which partial
// paths satisfy the hints (for Limit pushdown).
if prop.IsSortItemEmpty() && len(prop.SortItemsHints) > 0 {
hintsProp := prop.CloneEssentialFields()
hintsProp.SortItems = hintsProp.SortItemsHints
candidate.sortItemsHintsSatisfied = make([]bool, 0, len(path.PartialIndexPaths))
for _, partialPath := range path.PartialIndexPaths {
satisfied := matchProperty(ds, partialPath, hintsProp).Matched()
candidate.sortItemsHintsSatisfied = append(
candidate.sortItemsHintsSatisfied, satisfied)
}
}

candidate.isFullRange = path.IsFullScanRange(ds.TableInfo)
candidate.eqOrInCount = candidate.equalPredicateCount()
return candidate
Expand Down Expand Up @@ -2202,7 +2259,7 @@ func convertToIndexMergeScan(ds *logicalop.DataSource, prop *property.PhysicalPr
}
failpoint.Inject("forceIndexMergeKeepOrder", func(_ failpoint.Value) {
if len(candidate.path.PartialIndexPaths) > 0 && !candidate.path.IndexMergeIsIntersection {
if prop.IsSortItemEmpty() {
if prop.IsSortItemEmpty() && len(prop.SortItemsHints) == 0 {
failpoint.Return(base.InvalidTask, nil)
}
}
Expand Down Expand Up @@ -2271,6 +2328,7 @@ func convertToIndexMergeScan(ds *logicalop.DataSource, prop *property.PhysicalPr
cop.IdxMergePartPlans = scans
cop.IdxMergeIsIntersection = path.IndexMergeIsIntersection
cop.IdxMergeAccessMVIndex = path.IndexMergeAccessMVIndex
cop.IdxMergePartPlansSatisfySortHints = candidate.sortItemsHintsSatisfied
if len(globalRemainingFilters) != 0 {
cop.RootTaskConds = globalRemainingFilters
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/planner/core/operator/physicalop/physical_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,25 @@ func getPhysTopN(lt *logicalop.LogicalTopN, prop *property.PhysicalProperty) []b
}
ret := make([]base.PhysicalPlan, 0, len(allTaskTypes)+1)

// When TopN is directly above a DataSource, set SortItemsHints so that
// IndexMerge can prefer partial paths that satisfy the ORDER BY.
// This enables pushing Limit to ordered partial paths.
var sortItemsHints []property.SortItem
Comment thread
qw4990 marked this conversation as resolved.
Outdated
if _, ok := lt.Children()[0].(*logicalop.DataSource); ok && len(lt.ByItems) > 0 {
sortItemsHints = make([]property.SortItem, 0, len(lt.ByItems))
for _, byItem := range lt.ByItems {
col, ok := byItem.Expr.(*expression.Column)
if !ok {
sortItemsHints = nil
break
}
sortItemsHints = append(sortItemsHints, property.SortItem{
Col: col,
Desc: byItem.Desc,
})
}
}

// Generate candidate plans for partial order optimization using prefix index FIRST.
// This is important because when use_index hint is used with a prefix index,
// we need to set ForcePartialOrder flag before other candidates are evaluated.
Expand All @@ -290,7 +309,8 @@ func getPhysTopN(lt *logicalop.LogicalTopN, prop *property.PhysicalProperty) []b

for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64,
CTEProducerStatus: prop.CTEProducerStatus, NoCopPushDown: prop.NoCopPushDown}
CTEProducerStatus: prop.CTEProducerStatus, NoCopPushDown: prop.NoCopPushDown,
SortItemsHints: sortItemsHints}
topN := PhysicalTopN{
ByItems: lt.ByItems,
PartitionBy: lt.PartitionBy,
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/operator/physicalop/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ type CopTask struct {
// Set by convertToIndexScan when a prefix index provides partial order for TopN.
PartialOrderMatchResult *property.PartialOrderMatchResult

// IdxMergePartPlansSatisfySortHints tracks which partial paths of an IndexMerge
// satisfy SortItemsHints. Length equals len(IdxMergePartPlans).
// Set by convertToIndexMergeScan when SortItemsHints is used.
IdxMergePartPlansSatisfySortHints []bool

// Warnings passed through different task copy attached with more upper operator specific Warnings. (not concurrent safe)
Warnings SimpleWarnings
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,22 @@ func attach2Task4PhysicalTopN(pp base.PhysicalPlan, tasks ...base.Task) base.Tas
}
}
if copTask, ok := t.(*physicalop.CopTask); ok && needPushDown && canPushDownToTiKV(p, copTask) && len(copTask.RootTaskConds) == 0 {
// Handle IndexMerge with SortItemsHints when some (but not all) partial
// paths satisfy the sort hints. When all paths satisfy, the existing
// Limit pushdown via attach2Task4PhysicalLimit gives a better plan.
if len(copTask.IdxMergePartPlans) > 0 && !copTask.IndexPlanFinished &&
len(copTask.IdxMergePartPlansSatisfySortHints) > 0 {
allSatisfy := true
for _, s := range copTask.IdxMergePartPlansSatisfySortHints {
if !s {
allSatisfy = false
break
}
}
if !allSatisfy {
return handleSortItemsHintsForIndexMerge(p, copTask)
}
}
// If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and
// push it to table plan.
var pushedDownTopN *physicalop.PhysicalTopN
Expand Down Expand Up @@ -1435,6 +1451,46 @@ func estimateMaxXForPartialOrder() uint64 {
return 0
}

// handleSortItemsHintsForIndexMerge handles TopN pushdown when IndexMerge has
// SortItemsHints satisfaction info. It pushes Limit to partial paths that
// satisfy the sort order and TopN to those that don't, then keeps a root TopN
// for final merge.
func handleSortItemsHintsForIndexMerge(p *physicalop.PhysicalTopN, copTask *physicalop.CopTask) base.Task {
newCount := p.Offset + p.Count

for i, partialPlan := range copTask.IdxMergePartPlans {
if i < len(copTask.IdxMergePartPlansSatisfySortHints) &&
copTask.IdxMergePartPlansSatisfySortHints[i] {
// This partial path satisfies the sort order, push Limit.
childProfile := partialPlan.StatsInfo()
stats := property.DeriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := physicalop.PhysicalLimit{
Count: newCount,
}.Init(p.SCtx(), stats, p.QueryBlockOffset())
pushedDownLimit.SetChildren(partialPlan)
pushedDownLimit.SetSchema(partialPlan.Schema())
copTask.IdxMergePartPlans[i] = pushedDownLimit
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else {
// This partial path does not satisfy the sort order, push TopN.
pushedDownTopN, _ := getPushedDownTopN(p, partialPlan, copTask.GetStoreType())
copTask.IdxMergePartPlans[i] = pushedDownTopN
}
}

// Push TopN to the table plan side if it exists.
if copTask.TablePlan != nil {
pushedDownTopN, _ := getPushedDownTopN(p, copTask.TablePlan, copTask.GetStoreType())
copTask.TablePlan = pushedDownTopN
}

// Keep the root TopN as the final merge layer.
rootTask := copTask.ConvertToRootTask(p.SCtx())
if len(p.GetPartitionBy()) > 0 {
return rootTask
}
return attachPlan2Task(p, rootTask)
}

// attach2Task4PhysicalProjection implements PhysicalPlan interface.
func attach2Task4PhysicalProjection(pp base.PhysicalPlan, tasks ...base.Task) base.Task {
p := pp.(*physicalop.PhysicalProjection)
Expand Down
24 changes: 22 additions & 2 deletions pkg/planner/property/physical_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ type PhysicalProperty struct {
// The partialOrderInfo property will pass through to the datasource and try to matchPartialOrderProperty such as:
// index: (a, b(10) )
PartialOrderInfo *PartialOrderInfo

// SortItemsHints contains sort items that are preferred but not required.
// When SortItems is empty and SortItemsHints is not, DataSource can try to
// generate paths that satisfy these sort items, enabling Limit pushdown to
// partial paths of IndexMerge.
// Currently only set when TopN is directly above a DataSource.
SortItemsHints []SortItem
}

// PartialOrderInfo records information needed for partial order optimization.
Expand Down Expand Up @@ -579,7 +586,7 @@ func (p *PhysicalProperty) HashCode() []byte {
if p.hashcode != nil {
return p.hashcode
}
hashcodeSize := 8 + 8 + 8 + (16+8)*len(p.SortItems) + 8
hashcodeSize := 8 + 8 + 8 + (16+8)*len(p.SortItems) + 8 + (16+8)*len(p.SortItemsHints) + 8
if p.PartialOrderInfo != nil {
hashcodeSize += (16 + 8) * len(p.PartialOrderInfo.SortItems)
} else {
Expand Down Expand Up @@ -652,6 +659,15 @@ func (p *PhysicalProperty) HashCode() []byte {
} else {
p.hashcode = codec.EncodeInt(p.hashcode, 0)
}
// encode SortItemsHints into physical prop's hashcode.
for _, item := range p.SortItemsHints {
p.hashcode = append(p.hashcode, item.Col.HashCode()...)
if item.Desc {
p.hashcode = codec.EncodeInt(p.hashcode, 1)
} else {
p.hashcode = codec.EncodeInt(p.hashcode, 0)
}
}
return p.hashcode
}

Expand All @@ -672,7 +688,8 @@ func (p *PhysicalProperty) CloneEssentialFields() *PhysicalProperty {
MPPPartitionCols: p.MPPPartitionCols,
CTEProducerStatus: p.CTEProducerStatus,
NoCopPushDown: p.NoCopPushDown,
PartialOrderInfo: p.PartialOrderInfo, // Copy PartialOrderInfo for TopN partial order optimization
PartialOrderInfo: p.PartialOrderInfo,
SortItemsHints: p.SortItemsHints,
// we default not to clone basic indexJoinProp by default.
// and only call admitIndexJoinProp to inherit the indexJoinProp for special pattern operators.
}
Expand Down Expand Up @@ -710,6 +727,9 @@ func (p *PhysicalProperty) MemoryUsage() (sum int64) {
for _, mppCol := range p.MPPPartitionCols {
sum += mppCol.MemoryUsage()
}
for _, sortItem := range p.SortItemsHints {
sum += sortItem.MemoryUsage()
}
return
}

Expand Down
Loading
Loading