Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/planner/cardinality/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
timeout = "short",
srcs = [
"main_test.go",
"ndv_test.go",
"row_count_test.go",
"row_size_test.go",
"selectivity_test.go",
Expand Down
12 changes: 12 additions & 0 deletions pkg/planner/cardinality/pseudo.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func PseudoAvgCountPerValue(t *statistics.Table) float64 {
return float64(t.RealtimeCount) / pseudoEqualRate
}

// AggregateSelectedPartitionCounts aggregates the realtime and modify count from selected partitions.
func AggregateSelectedPartitionCounts(partitionStats []*statistics.Table) (realtimeCount, modifyCount int64, ok bool) {
for _, partStats := range partitionStats {
if partStats == nil || partStats.Pseudo {
return 0, 0, false
}
realtimeCount += partStats.RealtimeCount
modifyCount += partStats.ModifyCount
}
return realtimeCount, modifyCount, true
}

func pseudoSelectivity(sctx planctx.PlanContext, coll *statistics.HistColl, exprs []expression.Expression) float64 {
minFactor := selectionFactor
colExists := make(map[string]bool)
Expand Down
52 changes: 35 additions & 17 deletions pkg/planner/core/casetest/instanceplancache/others_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,23 +366,41 @@ func TestInstancePlanCachePartitioning(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(`set global tidb_enable_instance_plan_cache=1`)
tk.MustExec(`set @@tidb_partition_prune_mode='dynamic'`)

tk.MustExec(`create table t (a int, b varchar(255)) partition by hash(a) partitions 3`)
tk.MustExec(`insert into t values (1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f")`)
tk.MustExec(`prepare stmt from 'select a,b from t where a = ?;'`)
tk.MustExec(`set @a=1`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("1 a"))
// Same partition works, due to pruning is not affected
tk.MustExec(`set @a=4`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(`set @@tidb_partition_prune_mode='static'`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1105 skip prepared plan-cache: Static partition pruning mode"))
for _, selectedPartitionStats := range []string{"0", "1"} {
tk.MustExec(`admin flush instance plan_cache`)
tk.MustExec(`set @@tidb_opt_enable_selected_partition_stats=` + selectedPartitionStats)
tk.MustExec(`set @@tidb_partition_prune_mode='dynamic'`)
tk.MustExec(`drop table if exists t`)
tk.MustExec(`create table t (a int, b varchar(255)) partition by hash(a) partitions 3`)
tk.MustExec(`insert into t values (1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f")`)
tk.MustExec(`prepare stmt from 'select a,b from t where a = ?;'`)
tk.MustExec(`set @a=1`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("1 a"))
// Result correctness stays the same; whether the second execution hits cache depends on the selected partition stats switch.
tk.MustExec(`set @a=4`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
expectedFromCache := "1"
if selectedPartitionStats == "1" {
expectedFromCache = "0"
}
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows(expectedFromCache))
tk.MustQuery(`show warnings`).Check(testkit.Rows())

tk.MustExec(`set @@tidb_partition_prune_mode='static'`)
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
tk.MustQuery(`execute stmt using @a`).Check(testkit.Rows("4 d"))
warnings := tk.MustQuery(`show warnings`).Rows()
if len(warnings) > 0 {
require.True(t,
strings.Contains(warnings[0][2].(string), "skip prepared plan-cache: Static partition pruning mode") ||
strings.Contains(warnings[0][2].(string), "skip prepared plan-cache: static partition prune mode used"),
"unexpected warning: %s", warnings[0][2].(string),
)
}
tk.MustExec(`deallocate prepare stmt`)
tk.MustExec(`drop table t`)
}
}

func TestInstancePlanCachePlan(t *testing.T) {
Expand Down
72 changes: 72 additions & 0 deletions pkg/planner/core/casetest/planstats/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,75 @@ func TestPartialStatsInExplain(t *testing.T) {
require.NoError(t, dom.StatsHandle().LoadNeededHistograms(dom.InfoSchema()))
}
}

func TestDynamicPartitionPruneUsesMergedPartitionStats(t *testing.T) {
p := parser.New()
store, dom := testkit.CreateMockStoreAndDomain(t)

testKit := testkit.NewTestKit(t, store)
ctx := testKit.Session().(sessionctx.Context)

testKit.MustExec("use test")
testKit.MustExec("drop table if exists pt")
testKit.MustExec("set @@session.tidb_partition_prune_mode = 'dynamic'")
testKit.MustExec("set @@session.tidb_analyze_version = 2")
testKit.MustExec("set @@session.tidb_stats_load_sync_wait = 60000")
testKit.MustExec(
"create table pt(a int, b int) partition by range(a) (" +
"partition p0 values less than (100)," +
"partition p1 values less than (200)," +
"partition p2 values less than maxvalue)",
)

rows := make([]string, 0, 1200)
for i := 0; i < 1200; i++ {
rows = append(rows, fmt.Sprintf("(%d,%d)", i, i))
}
const batchSize = 200
for start := 0; start < len(rows); start += batchSize {
end := start + batchSize
if end > len(rows) {
end = len(rows)
}
testKit.MustExec("insert into pt values " + strings.Join(rows[start:end], ","))
}

oriLease := dom.StatsHandle().Lease()
dom.StatsHandle().SetLease(1)
defer func() {
dom.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table pt all columns")
dom.StatsHandle().Clear()
require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema()))

tbl, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("pt"))
require.NoError(t, err)
globalStats := dom.StatsHandle().GetPhysicalTableStats(tbl.Meta().ID, tbl.Meta())
require.Equal(t, int64(1200), globalStats.RealtimeCount)

for _, tc := range []struct {
sql string
expectedRowCount int64
}{
{
sql: "select /*+ set_var(tidb_opt_enable_selected_partition_stats=0) */ * from pt where a < 200",
expectedRowCount: 1200,
},
{
sql: "select /*+ set_var(tidb_opt_enable_selected_partition_stats=1) */ * from pt where a < 200",
expectedRowCount: 200,
},
} {
stmt, err := p.ParseOneStmt(tc.sql, "", "")
require.NoError(t, err)
require.NoError(t, executor.ResetContextOfStmt(ctx, stmt))
nodeW := resolve.NewNodeW(stmt)
plan, _, err := planner.Optimize(context.Background(), ctx, nodeW, dom.InfoSchema())
require.NoError(t, err)

reader, ok := plan.(*plannercore.PhysicalTableReader)
require.True(t, ok)
require.Equal(t, tc.expectedRowCount, reader.StatsInfo().HistColl.RealtimeCount)
}
}
4 changes: 3 additions & 1 deletion pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *log
c.visitedtbls[tblID] = struct{}{}
}
c.visitedPhysTblIDs.Insert(int(tblID))
if tblID != ds.PhysicalTableID {
if len(ds.StaticPrunedPartitionIDs) > 0 {
c.tblID2PartitionIDs[tblID] = append(c.tblID2PartitionIDs[tblID], ds.StaticPrunedPartitionIDs...)
} else if tblID != ds.PhysicalTableID {
c.tblID2PartitionIDs[tblID] = append(c.tblID2PartitionIDs[tblID], ds.PhysicalTableID)
}
for _, col := range ds.Schema().Columns {
Expand Down
47 changes: 41 additions & 6 deletions pkg/planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/bits"
"slices"
"sort"
"strconv"
"strings"
Expand All @@ -45,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/opcode"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
core_metrics "github.com/pingcap/tidb/pkg/planner/core/metrics"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
Expand Down Expand Up @@ -4178,10 +4180,11 @@ func addExtraPhysTblIDColumn4DS(ds *logicalop.DataSource) *expression.Column {
// 2. table row count from statistics is zero.
// 3. statistics is outdated.
// Note: please also update getLatestVersionFromStatsTable() when logic in this function changes.
func getStatsTable(ctx base.PlanContext, tblInfo *model.TableInfo, pid int64) *statistics.Table {
func getStatsTable(ctx base.PlanContext, tblInfo *model.TableInfo, pid int64, partitionIDs []int64) *statistics.Table {
statsHandle := domain.GetDomain(ctx).StatsHandle()
var usePartitionStats, countIs0, pseudoStatsForUninitialized, pseudoStatsForOutdated bool
var statsTbl *statistics.Table
selectedPartitionCountsOverridden := false
if ctx.GetSessionVars().StmtCtx.EnableOptimizerDebugTrace {
debugtrace.EnterContextCommon(ctx)
defer func() {
Expand All @@ -4203,7 +4206,30 @@ func getStatsTable(ctx base.PlanContext, tblInfo *model.TableInfo, pid int64) *s
return statistics.PseudoTable(tblInfo, false, true)
}

if pid == tblInfo.ID || ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
if len(partitionIDs) > 0 {
usePartitionStats = true
uniquePartitionStats := make([]*statistics.Table, 0, len(partitionIDs))
seen := make(map[int64]struct{}, len(partitionIDs))
for _, partitionID := range partitionIDs {
if _, ok := seen[partitionID]; ok {
continue
}
seen[partitionID] = struct{}{}
uniquePartitionStats = append(uniquePartitionStats, statsHandle.GetPhysicalTableStats(partitionID, tblInfo))
}
if len(uniquePartitionStats) == 1 {
statsTbl = uniquePartitionStats[0]
} else {
// Reuse the global stats objects and only narrow the table-level counts to the selected partitions.
statsTbl = statsHandle.GetPhysicalTableStats(tblInfo.ID, tblInfo)
if realtimeCount, modifyCount, ok := cardinality.AggregateSelectedPartitionCounts(uniquePartitionStats); ok {
statsTbl = statsTbl.ShallowCopy()
statsTbl.RealtimeCount = realtimeCount
statsTbl.ModifyCount = modifyCount
selectedPartitionCountsOverridden = true
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't anchor merged selected-partition stats on a pseudo global table.

For the multi-partition case, this always starts from tblInfo.ID stats and only overrides the counts. If the global table stats are uninitialized/pseudo while the selected partitions all have non-pseudo stats, the later pseudo checks still win and the optimizer drops back to pseudo stats again. The merged result needs to come from the selected partition stats themselves instead of inheriting the global table's initialization state.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/planner/core/logical_plan_builder.go` around lines 4223 - 4230, The
current logic reuses statsHandle.GetPhysicalTableStats(tblInfo.ID, tblInfo) and
only overrides RealtimeCount/ModifyCount, which causes the merged
selected-partition stats to inherit the global table's pseudo/uninitialized
state; instead, when cardinality.AggregateSelectedPartitionCounts returns ok,
build statsTbl from the selected partitions' merged stats (don’t start from
GetPhysicalTableStats(tblInfo.ID,...)), e.g. obtain or construct a TableStats
object that reflects the selected partitions' initialization/state and set
RealtimeCount and ModifyCount from realtimeCount/modifyCount, then set
selectedPartitionCountsOverridden = true so the optimizer uses the partitions'
real initialization status rather than the global pseudo one.

}
} else if pid == tblInfo.ID || ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
statsTbl = statsHandle.GetPhysicalTableStats(tblInfo.ID, tblInfo)
} else {
usePartitionStats = true
Expand All @@ -4217,6 +4243,10 @@ func getStatsTable(ctx base.PlanContext, tblInfo *model.TableInfo, pid int64) *s
// RealtimeCount to the row count from the ANALYZE, which is fetched from loaded stats in GetAnalyzeRowCount()).
if ctx.GetSessionVars().GetOptObjective() == variable.OptObjectiveDeterminate {
analyzeCount := max(int64(statsTbl.GetAnalyzeRowCount()), 0)
if selectedPartitionCountsOverridden {
// The selected-partition row count is planner-synthesized, so keep it instead of restoring the global analyze count.
analyzeCount = statsTbl.RealtimeCount
}
// If the two fields are already the values we want, we don't need to modify it, and also we don't need to copy.
if statsTbl.RealtimeCount != analyzeCount || statsTbl.ModifyCount != 0 {
// Here is a case that we need specially care about:
Expand Down Expand Up @@ -4524,6 +4554,9 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
}

if tableInfo.GetPartitionInfo() != nil {
hasGlobalIndex := slices.ContainsFunc(tableInfo.Indices, func(idx *model.IndexInfo) bool {
return idx.Global
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
// If `UseDynamicPruneMode` already been false, then we don't need to check whether execute `flagPartitionProcessor`
// otherwise we need to check global stats initialized for each partition table
if !b.ctx.GetSessionVars().IsDynamicPartitionPruneEnabled() {
Expand All @@ -4541,23 +4574,25 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
allowDynamicWithoutStats := fixcontrol.GetBoolWithDefault(b.ctx.GetSessionVars().GetOptimizerFixControlMap(), fixcontrol.Fix44262, skipMissingPartition)

// If dynamic partition prune isn't enabled or global stats is not ready, we won't enable dynamic prune mode in query
usePartitionProcessor := !isDynamicEnabled || (!globalStatsReady && !allowDynamicWithoutStats)
enableStaticPrune := !isDynamicEnabled || (!globalStatsReady && !allowDynamicWithoutStats)

failpoint.Inject("forceDynamicPrune", func(val failpoint.Value) {
if val.(bool) {
if isDynamicEnabled {
usePartitionProcessor = false
enableStaticPrune = false
}
}
})

if usePartitionProcessor {
if enableStaticPrune && !hasGlobalIndex {
b.optFlag = b.optFlag | rule.FlagPartitionProcessor
b.ctx.GetSessionVars().StmtCtx.UseDynamicPruneMode = false
if isDynamicEnabled {
b.ctx.GetSessionVars().StmtCtx.AppendWarning(
fmt.Errorf("disable dynamic pruning due to %s has no global stats", tableInfo.Name.String()))
}
} else if b.ctx.GetSessionVars().EnableSelectedPartitionStats && !hasGlobalIndex {
b.optFlag = b.optFlag | rule.FlagPartitionProcessor
}
}
}
Expand Down Expand Up @@ -4834,7 +4869,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if dirty || tableInfo.TempTableType == model.TempTableLocal || tableInfo.TableCacheStatusType == model.TableCacheStatusEnable {
us := logicalop.LogicalUnionScan{HandleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
us.SetChildren(ds)
if tableInfo.Partition != nil && b.optFlag&rule.FlagPartitionProcessor == 0 {
if tableInfo.Partition != nil {
// Adding ExtraPhysTblIDCol for UnionScan (transaction buffer handling)
// Not using old static prune mode
// Single TableReader for all partitions, needs the PhysTblID from storage
Expand Down
4 changes: 3 additions & 1 deletion pkg/planner/core/operator/logicalop/logical_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type DataSource struct {
// The data source may be a partition, rather than a real table.
PartitionDefIdx *int
PhysicalTableID int64
PartitionNames []pmodel.CIStr
// StaticPrunedPartitionIDs records the partitions selected by static pruning while keeping the datasource dynamic.
StaticPrunedPartitionIDs []int64
PartitionNames []pmodel.CIStr

// handleCol represents the handle column for the datasource, either the
// int primary key column or extra handle column.
Expand Down
Loading