Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7766,13 +7766,13 @@ def go_deps():
build_tags = ["nextgen", "intest"],
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "daff9dec412c452d98f2adf2e2c4ac5132e93acd0f773612c69681cd6f76b4d4",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260326064539-728676760deb",
sha256 = "06b88aebfa4f416d2fd77edb70d51f8c590f61f71be6cbfb328e525e9907d027",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260330060945-282ada62b856",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260326064539-728676760deb.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260330060945-282ada62b856.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ require (
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.11.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb h1:ZVxuRRtpKp76G4zNePQ9vLV2UwEf8nJjr6Z2kAHTbmg=
github.com/tikv/client-go/v2 v2.0.8-0.20260326064539-728676760deb/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg=
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856 h1:CJ2alESXl+dW4hQwO1vY5ingIDr6dGRB+8WodZeh24g=
github.com/tikv/client-go/v2 v2.0.8-0.20260330060945-282ada62b856/go.mod h1:2itxkVYz1s+dKsfJZqKcFD2MGUoDxvyy/Yqjd7vkdjg=
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14 h1:TVSx20m6DZMSiI37Dduu9RZb8yUvT1sgW8kCLAe+T5U=
github.com/tikv/pd/client v0.0.0-20260323032024-d7b638033a14/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k=
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk=
Expand Down
11 changes: 8 additions & 3 deletions pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,14 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}
}

if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil && r.storeType == kv.TiFlash {
if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
if r.storeType == kv.TiFlash {
ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx)
if ruv2Metrics == nil || !ruv2Metrics.Bypass() {
if ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err = execdetails.MergeTiFlashRUConsumption(r.selectResp.GetExecutionSummaries(), ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
}
}
}
}
if copStats.TimeDetail.ProcessTime > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ func recordInsertRows2Metrics(sessVars *variable.SessionVars) {

func (a *ExecStmt) finalizeStatementRUV2Metrics() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.RUV2Metrics == nil {
if sessVars.RUV2Metrics == nil || sessVars.RUV2Metrics.Bypass() {
return
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/executor/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,42 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) {
"update `stmt_summary_retry`%",
).Check(testkit.Rows("0 0"))
})

t.Run("bypass ru skips final reporting", func(t *testing.T) {
reporter := &mockRUV2ConsumptionReporter{}
ctx := &mockRUV2ReportingContext{
Context: mock.NewContext(),
reporter: reporter,
}
sessVars := ctx.GetSessionVars()
sessVars.StartTime = time.Now()
sessVars.StmtCtx.StmtType = "Select"
sessVars.StmtCtx.OriginalSQL = "select 1"
sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL)
sessVars.StmtCtx.ResourceGroupName = "rg1"

goCtx := execdetails.ContextWithInitializedExecDetails(context.Background())
sessVars.RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx)
require.NotNil(t, sessVars.RUV2Metrics)
sessVars.RUV2Metrics.SetBypass(true)
sessVars.RUV2Metrics.AddResultChunkCells(100)

ruDetails := goCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails)
ruDetails.AddTiKVRUV2(12345)
ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 10, WRU: 20})

execStmt := &executor.ExecStmt{
Ctx: ctx,
GoCtx: goCtx,
StmtNode: &ast.SelectStmt{},
}
execStmt.FinishExecuteStmt(0, nil, false)

require.Empty(t, reporter.group)
require.Zero(t, reporter.tikvRUV2)
require.Zero(t, reporter.tidbRUV2)
require.Zero(t, reporter.tiflashRU)
})
}

func TestSlowLogMaxPerSec(t *testing.T) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,9 +784,12 @@ func (c *localMppCoordinator) handleAllReports() error {
RecordOneCopTask(-1, kv.TiFlash, detail)] = 0
}
}
if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
ruv2Metrics := execdetails.RUV2MetricsFromContext(c.ctx)
if ruv2Metrics == nil || !ruv2Metrics.Bypass() {
if ruDetailsRaw := c.ctx.Value(clientutil.RUDetailsCtxKey); ruDetailsRaw != nil {
if err := execdetails.MergeTiFlashRUConsumption(report.executionSummaries, ruDetailsRaw.(*clientutil.RUDetails)); err != nil {
return err
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/server/conn_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,22 @@ func TestCursorWithParams(t *testing.T) {
require.Equal(t, float64(13), reporter.tiflashRU)
})

t.Run("cursor ruv2 bypass skips tracker creation", func(t *testing.T) {
reporter := &mockCursorRUV2ConsumptionReporter{}
goCtx := execdetails.ContextWithInitializedExecDetails(context.Background())
ruv2Metrics := execdetails.RUV2MetricsFromContext(goCtx)
require.NotNil(t, ruv2Metrics)
ruv2Metrics.SetBypass(true)
ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails)
ruDetails.AddTiKVRUV2(11)
tracker := resultset.NewCursorRUV2Tracker(reporter, "rg1", ruv2Metrics, ruDetails, execdetails.RUV2Weights{})
require.Nil(t, tracker)
require.Empty(t, reporter.group)
require.Zero(t, reporter.tikvRUV2)
require.Zero(t, reporter.tidbRUV2)
require.Zero(t, reporter.tiflashRU)
})

t.Run("write chunks skips column access on first next error", func(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewCursorRUV2Tracker(
if metrics == nil && ruDetails == nil {
return nil
}
if metrics != nil && metrics.Bypass() {
return nil
}
tracker := &CursorRUV2Tracker{
reporter: reporter,
resourceGroupName: resourceGroupName,
Expand Down
36 changes: 36 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2441,6 +2441,10 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s
ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, ruv2Metrics)
}
sessVars.RUV2Metrics = ruv2Metrics
bypass := shouldBypass(ctx, stmtNode, sessVars)
if ruv2Metrics != nil {
ruv2Metrics.SetBypass(bypass)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if pending := sessVars.RUV2PendingSessionParserTotal.Swap(0); pending > 0 && ruv2Metrics != nil {
ruv2Metrics.AddSessionParserTotal(pending)
}
Expand Down Expand Up @@ -2728,6 +2732,38 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s
return recordSet, nil
}

var isNextGenForRUV2 = kerneltype.IsNextGen

func shouldBypass(ctx context.Context, stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool {
switch kv.GetInternalSourceType(ctx) {
case kv.InternalTxnOthers:
return true
case kv.InternalTxnStats:
return isNextGenForRUV2() && isAnalyzeStatementForRUV2(stmtNode, sessVars)
default:
return false
}
}

func isAnalyzeStatementForRUV2(stmtNode ast.StmtNode, sessVars *variable.SessionVars) bool {
if stmtNode == nil || sessVars == nil {
return false
}
switch stmt := stmtNode.(type) {
case *ast.AnalyzeTableStmt:
return true
case *ast.ExecuteStmt:
prepareStmt, err := plannercore.GetPreparedStmt(stmt, sessVars)
if err != nil {
return false
}
_, ok := prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt)
return ok
default:
return false
}
}

func (s *session) GetSQLExecutor() sqlexec.SQLExecutor {
return s
}
Expand Down
71 changes: 57 additions & 14 deletions pkg/session/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,66 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) {
se, err := createSession(store)
require.NoError(t, err)

_, err = se.ParseWithParams(context.Background(), "select 1")
require.NoError(t, err)
require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load())
t.Run("standalone parse carries into next statement only once", func(t *testing.T) {
_, err = se.ParseWithParams(context.Background(), "select 1")
require.NoError(t, err)
require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load())

stmt, err := se.ParseWithParams(context.Background(), "set @a=1")
require.NoError(t, err)
require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load())
stmt, err := se.ParseWithParams(context.Background(), "set @a=1")
require.NoError(t, err)
require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load())

_, err = se.ExecuteStmt(context.Background(), stmt)
require.NoError(t, err)
require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load())
require.NotNil(t, se.sessionVars.RUV2Metrics)
require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal())
_, err = se.ExecuteStmt(context.Background(), stmt)
require.NoError(t, err)
require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load())
require.NotNil(t, se.sessionVars.RUV2Metrics)
require.Equal(t, int64(1), se.sessionVars.RUV2Metrics.SessionParserTotal())

dctx := se.GetDistSQLCtx()
require.Equal(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics)
require.NotNil(t, dctx.RUV2RPCInterceptor)
dctx := se.GetDistSQLCtx()
require.Same(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics)
require.NotNil(t, dctx.RUV2RPCInterceptor)
})

t.Run("internal others bypass skips parser ru accounting", func(t *testing.T) {
stmt, err := se.ParseWithParams(context.Background(), "set @b=1")
require.NoError(t, err)
require.Equal(t, int64(1), se.sessionVars.RUV2PendingSessionParserTotal.Load())

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
_, err = se.ExecuteStmt(ctx, stmt)
require.NoError(t, err)
require.Zero(t, se.sessionVars.RUV2PendingSessionParserTotal.Load())
require.NotNil(t, se.sessionVars.RUV2Metrics)
require.True(t, se.sessionVars.RUV2Metrics.Bypass())
require.Zero(t, se.sessionVars.RUV2Metrics.SessionParserTotal())
})

t.Run("statement bypass decision follows internal analyze semantics", func(t *testing.T) {
statsCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
origIsNextGenForRUV2 := isNextGenForRUV2
defer func() {
isNextGenForRUV2 = origIsNextGenForRUV2
}()

MustExec(t, se, "use test")
MustExec(t, se, "drop table if exists bypass_prepare")
MustExec(t, se, "create table bypass_prepare (a int)")

stmtID, _, _, err := se.PrepareStmt("analyze table bypass_prepare")
require.NoError(t, err)
prepStmt, err := se.GetSessionVars().GetPreparedStmtByID(stmtID)
require.NoError(t, err)
execAnalyzeStmt := &ast.ExecuteStmt{PrepStmt: prepStmt}

isNextGenForRUV2 = func() bool { return true }
require.True(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars))
require.True(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars))

isNextGenForRUV2 = func() bool { return false }
require.False(t, shouldBypass(statsCtx, &ast.AnalyzeTableStmt{}, se.sessionVars))
require.False(t, shouldBypass(statsCtx, execAnalyzeStmt, se.sessionVars))
require.False(t, shouldBypass(statsCtx, &ast.SelectStmt{}, se.sessionVars))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})
}

func TestCrossKSSessionDistSQLCtxDoesNotExposeTypedNilRUReporter(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions pkg/store/driver/txn/ruv2_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ func TestStatementRUV2RPCInterceptor(t *testing.T) {
require.Equal(t, int64(1), ruv2Metrics.ResourceManagerWriteCnt())
require.Equal(t, int64(9), ruv2Metrics.TiKVStorageProcessedKeysBatchGet())
require.Equal(t, int64(1), ruv2Metrics.TiKVStorageProcessedKeysGet())

t.Run("bypass ru skips interceptor accounting", func(t *testing.T) {
bypassed := execdetails.NewRUV2Metrics()
bypassed.SetBypass(true)
it := NewStatementRUV2RPCInterceptor(bypassed)
require.NotNil(t, it)

wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
return &tikvrpc.Response{
Resp: &kvrpcpb.BatchGetResponse{
ExecDetailsV2: &kvrpcpb.ExecDetailsV2{
RuV2: &kvrpcpb.RUV2{StorageProcessedKeysBatchGet: 4},
},
},
}, nil
})
_, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV})
require.NoError(t, err)
require.Zero(t, bypassed.ResourceManagerReadCnt())
require.Zero(t, bypassed.TiKVStorageProcessedKeysBatchGet())
})
}

func TestStatementRUV2RPCInterceptorNilMetrics(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,24 @@ func TestRUV2MetricsSnapshotCalculateRUValues(t *testing.T) {
require.Zero(t, metrics.CalculateRUValues(zeroScaleWeights))
require.Equal(t, tikvRU+tiflashRU, metrics.TotalRU(zeroScaleWeights, tikvRU, tiflashRU))
})

t.Run("bypass keeps total zero", func(t *testing.T) {
bypassed := NewRUV2Metrics()
bypassed.SetBypass(true)
bypassed.AddResultChunkCells(1000)
bypassed.AddPlanCnt(2)

require.Zero(t, bypassed.CalculateRUValues(weights))
require.Zero(t, bypassed.TotalRU(weights, tikvRU, tiflashRU))
total, detail := FormatRUV2Summary(bypassed, weights, tikvRU, tiflashRU)
require.Empty(t, total)
require.Empty(t, detail)
})

t.Run("nil metrics keep tikv and tiflash ru", func(t *testing.T) {
var nilMetrics *RUV2Metrics
require.Equal(t, tikvRU+tiflashRU, nilMetrics.TotalRU(weights, tikvRU, tiflashRU))
})
}

func TestRUV2MetricsSnapshotFreezesRUValues(t *testing.T) {
Expand Down
Loading
Loading