Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,9 @@ type RUV2Config struct {
ExecutorL2 float64 `toml:"executor-l2" json:"executor-l2"`
// ExecutorL3 is the weight for heavier operators: Sort and StreamAgg.
ExecutorL3 float64 `toml:"executor-l3" json:"executor-l3"`
// ExecutorL5InsertRows is the per-row weight for insert work. Level 4 is
// intentionally unused today because only L1/L2/L3 executor groups and this
// insert-specific tier are currently modeled.
// ExecutorL5InsertRows is the weight for insert rows multiplied by inserted
// column count. Level 4 is intentionally unused today because only L1/L2/L3
// executor groups and this insert-specific tier are currently modeled.
ExecutorL5InsertRows float64 `toml:"executor-l5-insert-rows" json:"executor-l5-insert-rows"`
PlanCnt float64 `toml:"plan-cnt" json:"plan-cnt"`
PlanDeriveStatsPaths float64 `toml:"plan-derive-stats-paths" json:"plan-derive-stats-paths"`
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,9 @@ executor-l1 = 0.00013278
executor-l2 = 0.00000383
# executor-l3 covers heavier operators: Sort and StreamAgg.
executor-l3 = 0.00141739
# executor-l5-insert-rows is the per-row insert weight. Level 4 is
# intentionally unused today because only L1/L2/L3 executor groups and this
# insert-specific tier are currently modeled.
# executor-l5-insert-rows is the weight for insert rows multiplied by inserted
# column count. Level 4 is intentionally unused today because only L1/L2/L3
# executor groups and this insert-specific tier are currently modeled.
executor-l5-insert-rows = 0.00472572
plan-cnt = 0.15392217
plan-derive-stats-paths = 0.24968182
Expand Down
23 changes: 3 additions & 20 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,6 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.StmtCtx.SetPlan(a.Plan)
}

a.recordInsertRows2Metrics()
a.finalizeStatementRUV2Metrics()
a.updateNetworkTrafficStatsAndMetrics()
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
Expand Down Expand Up @@ -1680,30 +1679,14 @@ func (a *ExecStmt) recordAffectedRows2Metrics() {
}
}

func (a *ExecStmt) recordInsertRows2Metrics() {
recordInsertRows2Metrics(a.Ctx.GetSessionVars())
}

func recordInsertRows2Metrics(sessVars *variable.SessionVars) {
stmtCtx := sessVars.StmtCtx
if stmtCtx.StmtType != "Insert" {
return
}
// EXPLAIN ANALYZE INSERT snapshots RU before FinishExecuteStmt runs, while the final statement reporting
// still goes through FinishExecuteStmt. Keep this accounting idempotent so both paths can share it safely.
if stmtCtx.InsertRowsAsRUV2Recorded {
return
}

affectedRows := stmtCtx.AffectedRows()
if affectedRows <= 0 {
func recordInsertRowsColMultiply2Metrics(sessVars *variable.SessionVars, rowsColMultiply int64) {
if rowsColMultiply <= 0 {
return
}

if sessVars.RUV2Metrics != nil {
sessVars.RUV2Metrics.AddExecutorL5InsertRows(int64(affectedRows))
sessVars.RUV2Metrics.AddExecutorL5InsertRows(rowsColMultiply)
}
stmtCtx.InsertRowsAsRUV2Recorded = true
}

func (a *ExecStmt) finalizeStatementRUV2Metrics() {
Expand Down
42 changes: 42 additions & 0 deletions pkg/executor/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,3 +813,45 @@ func TestMaxExecutionTimeIncludesTSOWaitTime(t *testing.T) {
})
}
}

func TestInsertRowsColMultiplyRUV2SQLPath(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int primary key, b int, c int)")
tk.MustExec("create table src(a int primary key, b int, c int)")
tk.MustExec("insert into src values (10, 11, 12), (20, 21, 22)")

runInsert := func(sql string) int64 {
ctx := execdetails.ContextWithInitializedExecDetails(context.Background())
tk.MustExecWithContext(ctx, sql)
metrics := execdetails.RUV2MetricsFromContext(ctx)
require.NotNil(t, metrics)
return metrics.ExecutorL5InsertRows()
}

require.Equal(t, int64(6), runInsert("insert into t values (1, 2, 3), (2, 3, 4)"))
require.Equal(t, int64(4), runInsert("insert into t(a, c) values (3, 5), (4, 6)"))
require.Equal(t, int64(4), runInsert("insert into t(a, b) select a, b from src"))

oldEnableBatchDML := vardef.EnableBatchDML.Load()
vardef.EnableBatchDML.Store(true)
defer vardef.EnableBatchDML.Store(oldEnableBatchDML)

tk.MustExec("set @@session.tidb_batch_insert=1")
tk.MustExec("set @@session.tidb_dml_batch_size=2")
tk.MustExec("create table batch_t(a int primary key, b int, c int)")
tk.MustExec("insert into batch_t values (100, 100, 100)")

ctx := execdetails.ContextWithInitializedExecDetails(context.Background())
_, err := tk.ExecWithContext(ctx, "insert into batch_t values (1, 2, 3), (2, 3, 4), (100, 5, 6), (3, 4, 5)")
require.Error(t, err)
metrics := execdetails.RUV2MetricsFromContext(ctx)
require.NotNil(t, metrics)
require.Equal(t, int64(12), metrics.ExecutorL5InsertRows())
tk.MustQuery("select a, b, c from batch_t order by a").Check(testkit.Rows(
"1 2 3",
"2 3 4",
"100 100 100",
))
}
1 change: 0 additions & 1 deletion pkg/executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
}
// Register the RU runtime stats to the runtime stats collection after the analyze executor has been executed.
if e.explain.Analyze && e.analyzeExec != nil && e.executed {
recordInsertRows2Metrics(e.Ctx().GetSessionVars())
ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey)
if coll := e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl; coll != nil {
var ruDetails *clientutil.RUDetails
Expand Down
34 changes: 29 additions & 5 deletions pkg/executor/explain_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/execdetails"
Expand All @@ -43,6 +44,30 @@ type mockErrorOperator struct {
closed bool
}

func TestInsertRowsColMultiplyRUV2Metrics(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx.StmtType = "Insert"
ctx.GetSessionVars().RUV2Metrics = execdetails.NewRUV2Metrics()

insertValues := &InsertValues{
BaseExecutor: exec.NewBaseExecutor(ctx, nil, 0),
rowCount: 4,
recordRUV2RowsColMultiply: true,
insertColumns: make([]*table.Column, 3),
}
require.Equal(t, int64(12), insertValues.rowsColMultiply())

insertValues.recordRowsColMultiply2RUV2Metrics()
require.Equal(t, int64(12), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows())

insertValues.recordRowsColMultiply2RUV2Metrics()
require.Equal(t, int64(12), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows())

insertValues.rowCount = 5
insertValues.recordRowsColMultiply2RUV2Metrics()
require.Equal(t, int64(15), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows())
}

type mockEmptyOperator struct {
exec.BaseExecutor
}
Expand Down Expand Up @@ -114,10 +139,9 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) {
require.EqualError(t, err, "next panic, close error")
require.True(t, mockOpr.closed)

t.Run("insert ru snapshot is complete before finish and remains idempotent", func(t *testing.T) {
t.Run("insert ru snapshot is complete before finish", func(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx.StmtType = "Insert"
ctx.GetSessionVars().StmtCtx.AddAffectedRows(5)
ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil)

goCtx := execdetails.ContextWithInitializedExecDetails(context.Background())
Expand All @@ -137,16 +161,16 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) {
analyzeExec: analyzeExec,
}

recordInsertRowsColMultiply2Metrics(ctx.GetSessionVars(), 15)
require.NoError(t, explainExec.executeAnalyzeExec(goCtx))

metrics := ctx.GetSessionVars().RUV2Metrics
require.Equal(t, int64(5), metrics.ExecutorL5InsertRows())
require.Equal(t, int64(15), metrics.ExecutorL5InsertRows())
// DefaultRUVersion is v1 (no domain in unit test), so RU stats show RRU+WRU format.
// Verify the stats are registered and contain "RU:" prefix.
rootStatsStr := ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(targetPlan.ID()).String()
require.Contains(t, rootStatsStr, "RU:")

recordInsertRows2Metrics(ctx.GetSessionVars())
require.Equal(t, int64(5), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows())
require.Equal(t, int64(15), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows())
})
}
1 change: 1 addition & 0 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.collectRuntimeStatsEnabled() {
ctx = context.WithValue(ctx, autoid.AllocatorRuntimeStatsCtxKey, e.stats.AllocatorRuntimeStats)
}
e.recordRUV2RowsColMultiply = true

if !e.EmptyChildren() && e.Children(0) != nil {
return insertRowsFromSelect(ctx, e)
Expand Down
40 changes: 36 additions & 4 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ import (
type InsertValues struct {
exec.BaseExecutor

rowCount uint64
curBatchCnt uint64
maxRowsInBatch uint64
lastInsertID uint64
rowCount uint64
curBatchCnt uint64
maxRowsInBatch uint64
lastInsertID uint64
recordRUV2RowsColMultiply bool
ruv2RecordedRowsColMultiply int64

SelectExec exec.Executor

Expand Down Expand Up @@ -99,6 +101,32 @@ type InsertValues struct {
ignoreErr bool
}

func (e *InsertValues) rowsColMultiply() int64 {
colCount := len(e.insertColumns)
if e.rowCount == 0 || colCount == 0 {
return 0
}

const maxInt64 = uint64(1<<63 - 1)
if e.rowCount > maxInt64/uint64(colCount) {
return int64(maxInt64)
}
return int64(e.rowCount * uint64(colCount))
}

func (e *InsertValues) recordRowsColMultiply2RUV2Metrics() {
if !e.recordRUV2RowsColMultiply {
return
}
current := e.rowsColMultiply()
delta := current - e.ruv2RecordedRowsColMultiply
if delta <= 0 {
return
}
recordInsertRowsColMultiply2Metrics(e.Ctx().GetSessionVars(), delta)
e.ruv2RecordedRowsColMultiply = current
}

type defaultVal struct {
val types.Datum
// valid indicates whether the val is evaluated. We evaluate the default value lazily.
Expand Down Expand Up @@ -234,6 +262,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
if err = base.exec(ctx, rows); err != nil {
return err
}
e.recordRowsColMultiply2RUV2Metrics()
rows = rows[:0]
memTracker.Consume(-memUsageOfRows)
memUsageOfRows = 0
Expand All @@ -255,6 +284,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
if err != nil {
return err
}
e.recordRowsColMultiply2RUV2Metrics()
memTracker.Consume(-memUsageOfRows)
return nil
}
Expand Down Expand Up @@ -505,6 +535,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
if err = base.exec(ctx, rows); err != nil {
return err
}
e.recordRowsColMultiply2RUV2Metrics()
rows = rows[:0]
extraColsInSel = extraColsInSel[:0]
totalMemDelta += -memUsageOfRows - memUsageOfExtraCols
Expand All @@ -526,6 +557,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
if err != nil {
return err
}
e.recordRowsColMultiply2RUV2Metrics()
rows = rows[:0]
extraColsInSel = extraColsInSel[:0]
memTracker.Consume(-memUsageOfRows - memUsageOfExtraCols - chkMemUsage)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/internal/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ go_test(
],
embed = [":exec"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/domain",
"//pkg/kv",
Expand Down
23 changes: 16 additions & 7 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ type ruv2ExecutorMetric struct {
// by both RU v2 statement metrics and the configurable RU v2 weights.
//
// L1: BatchPointGet, PointGet, Limit.
// L2: HashAgg, HashJoin, IndexLookUpJoin, IndexLookUpExecutor,
// IndexReaderExecutor, MemTableReaderExec, SelectionExec, TableDualExec,
// TableReaderExecutor, UnionScanExec, SelectLockExec.
// L2: Expand, HashAgg, HashJoin, IndexLookUpJoin, IndexLookUpExecutor,
// IndexLookUpMergeJoin, IndexNestedLoopHashJoin, IndexReaderExecutor,
// MemTableReaderExec, MergeJoin, Projection, SelectionExec, TableDualExec,
// TableReaderExecutor, TopN, UnionScanExec, SelectLockExec, Window.
// L3: Sort, StreamAgg.
// L4: intentionally unused today.
// L5: reserved for insert-row accounting outside this executor map.
Expand All @@ -110,18 +111,26 @@ var ruv2ExecutorMetricByType = map[string]ruv2ExecutorMetric{
"*executor.PointGetExecutor": {level: 1, label: "PointGetExecutor", useCells: true},
"*executor.LimitExec": {level: 1, label: "LimitExec", useCells: true},
"*aggregate.HashAggExec": {level: 2, label: "HashAggExec", useCells: false},
"*executor.HashJoinExec": {level: 2, label: "HashJoinExec", useCells: false},
"*executor.IndexLookUpJoin": {level: 2, label: "IndexLookUpJoin", useCells: true},
"*executor.ExpandExec": {level: 2, label: "ExpandExec", useCells: false},
"*executor.IndexLookUpExecutor": {level: 2, label: "IndexLookUpExecutor", useCells: false},
"*executor.IndexReaderExecutor": {level: 2, label: "IndexReaderExecutor", useCells: false},
"*executor.MemTableReaderExec": {level: 2, label: "MemTableReaderExec", useCells: false},
"*executor.ProjectionExec": {level: 2, label: "ProjectionExec", useCells: true},
"*executor.SelectionExec": {level: 2, label: "SelectionExec", useCells: false},
"*executor.SelectLockExec": {level: 2, label: "SelectLockExec", useCells: true},
"*executor.TableDualExec": {level: 2, label: "TableDualExec", useCells: false},
"*executor.TableReaderExecutor": {level: 2, label: "TableReaderExecutor", useCells: false},
"*executor.UnionScanExec": {level: 2, label: "UnionScanExec", useCells: false},
"*executor.SelectLockExec": {level: 2, label: "SelectLockExec", useCells: true},
"*executor.SortExec": {level: 3, label: "SortExec", useCells: true},
"*executor.WindowExec": {level: 2, label: "WindowExec", useCells: false},
"*join.HashJoinV1Exec": {level: 2, label: "HashJoinV1Exec", useCells: false},
"*join.HashJoinV2Exec": {level: 2, label: "HashJoinV2Exec", useCells: false},
"*join.IndexLookUpJoin": {level: 2, label: "IndexLookUpJoin", useCells: true},
"*join.IndexLookUpMergeJoin": {level: 2, label: "IndexLookUpMergeJoin", useCells: true},
"*join.IndexNestedLoopHashJoin": {level: 2, label: "IndexNestedLoopHashJoin", useCells: true},
"*join.MergeJoinExec": {level: 2, label: "MergeJoinExec", useCells: false},
"*sortexec.TopNExec": {level: 2, label: "TopNExec", useCells: true},
"*aggregate.StreamAggExec": {level: 3, label: "StreamAggExec", useCells: false},
"*sortexec.SortExec": {level: 3, label: "SortExec", useCells: true},
}

func addRUV2ExecutorMetricWithInfo(ctx context.Context, info ruv2ExecutorMetric, useCells bool, delta int64) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/executor/internal/exec/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,32 @@ func TestNextIOAccAddInputCountsRowsWithZeroCols(t *testing.T) {
require.True(t, needNextIOAcc(false, parentAcc, 1))
})
}

func TestRUV2ExecutorMetricByTypeIncludesConcreteExecutorTypes(t *testing.T) {
cases := map[string]ruv2ExecutorMetric{
"*executor.ProjectionExec": {level: 2, label: "ProjectionExec", useCells: true},
"*join.HashJoinV1Exec": {level: 2, label: "HashJoinV1Exec", useCells: false},
"*join.HashJoinV2Exec": {level: 2, label: "HashJoinV2Exec", useCells: false},
"*join.IndexLookUpJoin": {level: 2, label: "IndexLookUpJoin", useCells: true},
"*join.IndexLookUpMergeJoin": {level: 2, label: "IndexLookUpMergeJoin", useCells: true},
"*join.IndexNestedLoopHashJoin": {level: 2, label: "IndexNestedLoopHashJoin", useCells: true},
"*join.MergeJoinExec": {level: 2, label: "MergeJoinExec", useCells: false},
"*sortexec.SortExec": {level: 3, label: "SortExec", useCells: true},
"*sortexec.TopNExec": {level: 2, label: "TopNExec", useCells: true},
}

for typ, expected := range cases {
actual, ok := ruv2ExecutorMetricByType[typ]
require.True(t, ok, typ)
require.Equal(t, expected, actual)
}

for _, staleType := range []string{
"*executor.HashJoinExec",
"*executor.IndexLookUpJoin",
"*executor.SortExec",
} {
_, ok := ruv2ExecutorMetricByType[staleType]
require.False(t, ok, staleType)
}
}
6 changes: 0 additions & 6 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,6 @@ type StatementContext struct {
// IsExplainAnalyzeDML is true if the statement is "explain analyze DML executors", before responding the explain
// results to the client, the transaction should be committed first. See issue #37373 for more details.
IsExplainAnalyzeDML bool
// InsertRowsAsRUV2Recorded tracks whether the statement-level insert-row RUv2 cost has already been
// applied to RUV2Metrics. This must stay idempotent because EXPLAIN ANALYZE INSERT snapshots RU before
// FinishExecuteStmt runs, while FinishExecuteStmt still needs to reuse the same accounting path for the
// final slow-log and resource-group reporting.
InsertRowsAsRUV2Recorded bool

// InHandleForeignKeyTrigger indicates currently are handling foreign key trigger.
InHandleForeignKeyTrigger bool

Expand Down
Loading