Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
80c401c
fixup
qw4990 Mar 30, 2026
f81dafa
fixup
qw4990 Mar 30, 2026
df2f2d0
fixup
qw4990 Mar 30, 2026
6068049
fixup
qw4990 Mar 30, 2026
dbd77f5
fixup
qw4990 Mar 30, 2026
9577df1
expression: cache GetType result and remove unnecessary sleep
you06 Mar 2, 2026
cf42f84
expression: optimize CompareInt to skip GetType for constant expressions
you06 Mar 2, 2026
c276a1b
expression: avoid heap allocation in Constant.Eval* by using stack-al…
you06 Mar 2, 2026
79e191f
expression: polish adjacent naming and docs
you06 Mar 6, 2026
ceddb37
executor: add fast path for cached table scan in UnionScanExec
you06 Mar 2, 2026
1d0fd04
executor: batch scan cached table rows
you06 Mar 2, 2026
cf8a94d
rowcodec: cache not-null column mapping in ChunkDecoder
you06 Mar 2, 2026
f224991
executor: fast-path int handle decode in mem reader
you06 Mar 2, 2026
c5a67b1
util: optimize rowcodec DecodeToChunk hot paths
you06 Mar 2, 2026
b475323
tablecodec: reduce allocations in DecodeIndexKV general/v1 paths
you06 Mar 2, 2026
7b1b44e
tablecodec: cache IndexRestoredDecoder and use arena for restored val…
you06 Mar 2, 2026
cba21e2
types: add zero-alloc AppendString for Time and use it in DumpTextRow
you06 Mar 2, 2026
4a0ef83
executor: cache per-scan invariants in memIndexReader to reduce overhead
you06 Mar 2, 2026
743ff08
tablecodec: make restored index decode bytes durability explicit
you06 Mar 5, 2026
3cee7db
codec: handle nil loc in EncodeMySQLTime for timestamp
you06 Mar 6, 2026
b2ff4ca
chunk: avoid duplicate Column puts when chunks contain ref columns
you06 Mar 6, 2026
7991824
rowcodec: validate checksum inputs for raw checksum
you06 Mar 6, 2026
e5f14e1
rowcodec: make raw checksum versioning explicit
you06 Mar 6, 2026
0214ec8
table: add result set cache for cached tables
you06 Mar 3, 2026
6e08a0c
table: invalidate result set cache on lease transitions and writes
you06 Mar 3, 2026
4ebf953
planner: add result set cache eligibility check for cached tables
you06 Mar 3, 2026
95a6afd
planner: add result cache key builder for cached table queries
you06 Mar 3, 2026
ef0e9ef
executor: integrate result set cache into executor pipeline for cache…
you06 Mar 3, 2026
4704e53
executor: add observability for cached table result set cache
you06 Mar 3, 2026
26d6ba4
*: harden result set cache with param bytes verification and edge cas…
you06 Mar 3, 2026
648dec1
result cache: harden key + cacheability checks
you06 Mar 5, 2026
fca847d
executor: harden cached table cache paths
you06 Mar 30, 2026
4fc7a54
planner: harden result cache key and cacheability checks
you06 Mar 30, 2026
860cf64
tables: avoid double memory accounting in cached result put
you06 Mar 30, 2026
e88a25a
table: add pre-decoded datum cache for cached tables
you06 Mar 30, 2026
1dda559
executor: add datum cache fast-path iterator to skip KV decode
you06 Mar 30, 2026
5f6f988
*: make cached table result set cache and datum cache timezone-aware
you06 Mar 30, 2026
f3976d1
executor: add unit tests for memCachedDatumIter
you06 Mar 30, 2026
15b2a3a
*: add pre-decoded index datum cache for cached table index scans
you06 Mar 30, 2026
e45e6d8
executor: optimize cached table scan by reducing redundant copies and…
you06 Mar 30, 2026
678a56c
codec: fix TZ nil handling and keep index decode prealloc
you06 Mar 30, 2026
91fa4e8
tables: fix cached datum decode and result cache accounting
you06 Mar 30, 2026
275b180
tables: pin cached datum/index caches to cacheData generation
you06 Mar 30, 2026
6fc6b99
tables: fill cached datum rows with origin default values
you06 Mar 30, 2026
b456da6
executor: pin cached datum/index caches to cacheData generation
you06 Mar 30, 2026
9653da9
executor: tighten cached datum iter and cache backfill safety
you06 Mar 30, 2026
a930d09
tables: increase cached table size limit from 64MB to 256MB
you06 Mar 30, 2026
9d3cd9e
chore: update bazel file
you06 Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"brie.go",
"brie_utils.go",
"builder.go",
"cached_result_exec.go",
"check_table_index.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -349,6 +350,7 @@ go_test(
"brie_test.go",
"brie_utils_test.go",
"builder_index_join_cleanup_test.go",
"cached_result_exec_test.go",
"checksum_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down Expand Up @@ -379,6 +381,7 @@ go_test(
"inspection_summary_test.go",
"join_pkg_test.go",
"main_test.go",
"mem_reader_test.go",
"memtable_reader_test.go",
"metrics_reader_test.go",
"parallel_apply_test.go",
Expand Down Expand Up @@ -521,6 +524,7 @@ go_test(
"//pkg/util/paging",
"//pkg/util/ranger",
"//pkg/util/regionsplit",
"//pkg/util/rowcodec",
"//pkg/util/sem",
"//pkg/util/sem/v2:sem",
"//pkg/util/set",
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,10 @@ func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
} else {
// For non-prepared queries, wrap with result set cache here.
// Prepared statements are wrapped inside ExecuteExec.Build().
e = b.wrapWithResultCache(e, a.StmtNode, a.Plan)
}
a.isSelectForUpdate = b.hasLock && (!stmtCtx.InDeleteStmt && !stmtCtx.InUpdateStmt && !stmtCtx.InInsertStmt)
return e, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/adapter_slow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func SetSlowLogItems(a *ExecStmt, txnTS uint64, hasMoreResults bool, items *vari
items.ResultRows = stmtCtx.GetResultRowsCount()
items.IsExplicitTxn = sessVars.TxnCtx.IsExplicit
items.IsWriteCacheTable = stmtCtx.WaitLockLeaseTime > 0
items.ResultCacheHit = stmtCtx.ReadFromResultCache
items.UsedStats = stmtCtx.GetUsedStatsInfo(false)
items.IsSyncStatsFailed = stmtCtx.IsSyncStatsFailed
items.Warnings = variable.CollectWarningsForSlowLog(stmtCtx)
Expand Down
178 changes: 176 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ type executorBuilder struct {
// Used when building MPPGather.
encounterUnionScan bool

// cachedTbl is set when a cached table's KV cache is hit during building.
// Used to attach a result set cache wrapper around the final executor.
cachedTbl table.CachedTable
// cachedTblID records the first cached table ID observed in the plan.
// Used to disable result cache when multiple cached tables are involved.
cachedTblID int64
// disableResultCache indicates the plan touches multiple cached tables.
// Result set cache entries are scoped to a single cached table instance.
disableResultCache bool

// stmtCtxLock guards statement context and telemetry updates when executor building happens concurrently.
// It is only set for dataReaderBuilder instances used by index join inner workers.
stmtCtxLock *sync.Mutex
Expand Down Expand Up @@ -1492,6 +1502,61 @@ func collectColIdxFromByItems(byItems []*plannerutil.ByItems, cols []*model.Colu
return colIdxs, nil
}

// removeRedundantAccessConditions removes access conditions from allConds that are
// already satisfied by the index kvRanges and don't need to be reserved (i.e., the
// condition references only full-length index columns). This mirrors the ranger's
// shouldReserve logic: a condition is safe to remove only when all its referenced
// columns are full-length index columns (IdxColLens[i] == types.UnspecifiedLength
// or IdxColLens[i] == col.GetFlen()).
func removeRedundantAccessConditions(
allConds []expression.Expression,
accessConds []expression.Expression,
idxCols []*expression.Column,
idxColLens []int,
evalCtx expression.EvalContext,
) []expression.Expression {
// Build a set of full-length index column UniqueIDs.
fullLenColIDs := make(map[int64]struct{}, len(idxCols))
for i, col := range idxCols {
if i < len(idxColLens) {
length := idxColLens[i]
if length == types.UnspecifiedLength || length == col.GetType(evalCtx).GetFlen() {
fullLenColIDs[col.UniqueID] = struct{}{}
}
}
}

// Build the set of canonical hash codes for access conditions that are safe to remove.
safeToRemove := make(map[string]struct{}, len(accessConds))
for _, ac := range accessConds {
cols := expression.ExtractColumns(ac)
allFullLen := true
for _, col := range cols {
if _, ok := fullLenColIDs[col.UniqueID]; !ok {
allFullLen = false
break
}
}
if allFullLen {
// CanonicalHashCode caches inside expression objects and is not goroutine-safe.
// Clone before hashing to avoid data races when building executors concurrently.
safeToRemove[string(ac.Clone().CanonicalHashCode())] = struct{}{}
}
}
if len(safeToRemove) == 0 {
return allConds
}

// Filter out conditions whose canonical hash matches a safe-to-remove access condition.
result := make([]expression.Expression, 0, len(allConds))
for _, cond := range allConds {
if _, ok := safeToRemove[string(cond.Clone().CanonicalHashCode())]; !ok {
result = append(result, cond)
}
}
return result
}

// buildUnionScanFromReader builds union scan executor from child executor.
// Note that this function may be called by inner workers of index lookup join concurrently.
// Be careful to avoid data race.
Expand Down Expand Up @@ -1577,6 +1642,18 @@ func (b *executorBuilder) buildUnionScanFromReader(reader exec.Executor, v *phys
}
}
us.conditions, us.conditionsWithVirCol = physicalop.SplitSelCondsWithVirtualColumn(v.Conditions)
// Remove access conditions already satisfied by kvRanges to avoid redundant EvalBool.
if idxReader, ok := v.Children()[0].(*physicalop.PhysicalIndexReader); ok {
if idxScan, ok := idxReader.IndexPlans[0].(*physicalop.PhysicalIndexScan); ok {
if len(idxScan.AccessCondition) > 0 && len(us.conditions) > 0 {
us.conditions = removeRedundantAccessConditions(
us.conditions, idxScan.AccessCondition,
idxScan.IdxCols, idxScan.IdxColLens,
b.ctx.GetExprCtx().GetEvalCtx(),
)
}
}
}
us.columns = x.columns
us.partitionIDMap = x.partitionIDMap
us.table = x.table
Expand Down Expand Up @@ -1648,6 +1725,7 @@ type bypassDataSourceExecutor interface {
func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourceExecutor, vars *variable.SessionVars, startTS uint64) {
tbl := x.Table()
if tbl.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
b.observeCachedTable(tbl.Meta().ID)
cachedTable := tbl.(table.CachedTable)
// Determine whether the cache can be used.
leaseDuration := time.Duration(vardef.TableCacheLease.Load()) * time.Second
Expand All @@ -1656,6 +1734,41 @@ func (us *UnionScanExec) handleCachedTable(b *executorBuilder, x bypassDataSourc
vars.StmtCtx.ReadFromTableCache = true
x.setDummy()
us.cacheTable = cacheData
// Prefer caches pinned to the same cacheData generation as cacheTable.
if dcp, ok := cachedTable.(interface {
GetCachedDatumDataForMemBuffer(kv.MemBuffer) *tables.CachedDatumData
}); ok {
us.datumCache = dcp.GetCachedDatumDataForMemBuffer(cacheData)
} else if dcp, ok := cachedTable.(interface {
GetCachedDatumData() *tables.CachedDatumData
}); ok {
us.datumCache = dcp.GetCachedDatumData()
}
// Prefer index caches pinned to the same cacheData generation as cacheTable.
if icp, ok := cachedTable.(interface {
GetCachedIndexDatumDataForMemBuffer(kv.MemBuffer, int64) *tables.CachedIndexDatumData
}); ok {
for _, idx := range tbl.Meta().Indices {
if dc := icp.GetCachedIndexDatumDataForMemBuffer(cacheData, idx.ID); dc != nil {
if us.indexDatumCaches == nil {
us.indexDatumCaches = make(map[int64]*tables.CachedIndexDatumData)
}
us.indexDatumCaches[idx.ID] = dc
}
}
} else if icp, ok := cachedTable.(interface {
GetCachedIndexDatumData(int64) *tables.CachedIndexDatumData
}); ok {
for _, idx := range tbl.Meta().Indices {
if dc := icp.GetCachedIndexDatumData(idx.ID); dc != nil {
if us.indexDatumCaches == nil {
us.indexDatumCaches = make(map[int64]*tables.CachedIndexDatumData)
}
us.indexDatumCaches[idx.ID] = dc
}
}
}
b.recordCachedTable(cachedTable)
} else if loading {
return
} else if !b.inUpdateStmt && !b.inDeleteStmt && !b.inInsertStmt && !vars.StmtCtx.InExplainStmt {
Expand Down Expand Up @@ -6292,20 +6405,81 @@ func (b *executorBuilder) getCacheTable(tblInfo *model.TableInfo, startTS uint64
return nil
}
sessVars := b.ctx.GetSessionVars()
b.observeCachedTable(tblInfo.ID)
leaseDuration := time.Duration(vardef.TableCacheLease.Load()) * time.Second
cacheData, loading := tbl.(table.CachedTable).TryReadFromCache(startTS, leaseDuration)
cachedTable := tbl.(table.CachedTable)
cacheData, loading := cachedTable.TryReadFromCache(startTS, leaseDuration)
if cacheData != nil {
sessVars.StmtCtx.ReadFromTableCache = true
b.recordCachedTable(cachedTable)
return cacheData
} else if loading {
return nil
}
if !b.ctx.GetSessionVars().StmtCtx.InExplainStmt && !b.inDeleteStmt && !b.inUpdateStmt {
tbl.(table.CachedTable).UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
cachedTable.UpdateLockForRead(context.Background(), b.ctx.GetStore(), startTS, leaseDuration)
}
return nil
}

func (b *executorBuilder) observeCachedTable(tableID int64) {
if b.disableResultCache || tableID == 0 {
return
}
if b.cachedTblID == 0 {
b.cachedTblID = tableID
return
}
if b.cachedTblID != tableID {
b.disableResultCache = true
b.cachedTbl = nil
}
}

func (b *executorBuilder) recordCachedTable(cachedTable table.CachedTable) {
if cachedTable == nil {
return
}
meta := cachedTable.Meta()
if meta == nil {
b.disableResultCache = true
b.cachedTbl = nil
return
}
b.observeCachedTable(meta.ID)
if b.disableResultCache || b.cachedTbl != nil {
return
}
b.cachedTbl = cachedTable
}

// wrapWithResultCache wraps the top-level executor with CachedResultExec when
// the query is eligible for result set caching on a cached table.
func (b *executorBuilder) wrapWithResultCache(e exec.Executor, stmtNode ast.StmtNode, plan base.Plan) exec.Executor {
if b.cachedTbl == nil || b.disableResultCache {
return e
}
inDML := b.inUpdateStmt || b.inDeleteStmt || b.inInsertStmt
physPlan, ok := plan.(base.PhysicalPlan)
if !ok {
return e
}
if !plannercore.CanCacheResultSet(stmtNode, physPlan, inDML) {
return e
}
key, paramBytes, ok := plannercore.BuildResultCacheKey(b.ctx)
if !ok {
return e
}
return &CachedResultExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, e.Schema(), physPlan.ID(), e),
original: e,
cachedTable: b.cachedTbl,
cacheKey: key,
paramBytes: paramBytes,
}
}

func (b *executorBuilder) buildCompactTable(v *plannercore.CompactTable) exec.Executor {
if v.ReplicaKind != ast.CompactReplicaKindTiFlash && v.ReplicaKind != ast.CompactReplicaKindAll {
b.err = errors.Errorf("compact %v replica is not supported", strings.ToLower(string(v.ReplicaKind)))
Expand Down
Loading