Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7805,13 +7805,13 @@ def go_deps():
build_tags = ["nextgen", "intest"],
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "a5e70899561f437abf34016a1fd907ab6394ab4771f75ba509b61dd5bb806351",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260414033830-1adc54c38a51",
sha256 = "ea75e563faf04f631d009f91407042e81681ee22730010e721bcdf21eda90bc0",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260508015347-9513b5e765b2",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260508015347-9513b5e765b2.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260508015347-9513b5e765b2.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260508015347-9513b5e765b2.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260508015347-9513b5e765b2.zip",
],
)
go_repository(
Expand Down
2 changes: 2 additions & 0 deletions br/cmd/br/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/tidb-server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ go_test(
srcs = ["main_test.go"],
embed = [":tidb-server_lib"],
flaky = True,
shard_count = 6,
shard_count = 5,
deps = [
"//pkg/config",
"//pkg/config/kerneltype",
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ require (
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.11.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51
github.com/tikv/client-go/v2 v2.0.8-0.20260508015347-9513b5e765b2
github.com/tikv/pd/client v0.0.0-20260401072359-048f0d8f6f71
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67
github.com/twmb/murmur3 v1.1.6
Expand Down Expand Up @@ -353,7 +353,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/protobuf v1.36.10
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.29.11 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51 h1:HiKahyqWSI5aijRsSk1JciFg61D5a0x3TFyleYERBl4=
github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51/go.mod h1:PILS4Yr8mWPD7J6W0+hVq4Z+lwhTIYxPYUA/OTxPSvg=
github.com/tikv/client-go/v2 v2.0.8-0.20260508015347-9513b5e765b2 h1:pQw7LTK6JzWYsNn2aGKzRD9ojPa1ic8fqvc6xgkcXGY=
github.com/tikv/client-go/v2 v2.0.8-0.20260508015347-9513b5e765b2/go.mod h1:rg9c3yf9lfQdj9rt5FvwRP9xDubUY6C9hyH4g9zFH1s=
github.com/tikv/pd/client v0.0.0-20260401072359-048f0d8f6f71 h1:5hCQ6J2fwUpYqIgQGR625bW98wvYS9FUpTiVszIbVSg=
github.com/tikv/pd/client v0.0.0-20260401072359-048f0d8f6f71/go.mod h1:4kxXuAQAREpH+lVbydVwGNNDmcwdj0RG4Ofwky08W/k=
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk=
Expand Down
10 changes: 9 additions & 1 deletion pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ go_library(
"//pkg/metrics/common",
"//pkg/parser/terror",
"//pkg/timer/metrics",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_prometheus_client_model//go",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//util/collectors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//channelz/grpc_channelz_v1",
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//test/bufconn",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -61,11 +68,12 @@ go_test(
],
embed = [":metrics"],
flaky = True,
shard_count = 5,
shard_count = 8,
deps = [
"//pkg/parser/terror",
"//pkg/statistics/handle/cache",
"//pkg/testkit/testsetup",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.Cleanup(func(int) { cleanupGrpcChannelzCollectorForTest() }),
}
goleak.VerifyTestMain(m, opts...)
}
153 changes: 153 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
package metrics

import (
"context"
"net"
"sync"

"github.com/pingcap/tidb/pkg/dxf/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/ingestor/ingestmetric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
timermetrics "github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
tikvcollectors "github.com/tikv/client-go/v2/util/collectors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

var (
Expand Down Expand Up @@ -394,6 +403,9 @@ func RegisterMetrics() {
// StmtSummary
prometheus.MustRegister(StmtSummaryWindowRecordCount)
prometheus.MustRegister(StmtSummaryWindowEvictedCount)

// Channelz
setupChannelzCollector()
}

// Register registers custom collectors.
Expand Down Expand Up @@ -458,3 +470,144 @@ func ToggleSimplifiedMode(simplified bool) {
}
}
}

var grpcChannelzCollector struct {
mu sync.Mutex

listener *bufconn.Listener
server *grpc.Server
conn *grpc.ClientConn

collector prometheus.Collector
registered bool
}

func setupChannelzCollector() {
if intest.InTest {
return
}

grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

if err := initGrpcChannelzCollectorLocked(); err != nil {
logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
return
}
if grpcChannelzCollector.registered {
return
}
prometheus.MustRegister(grpcChannelzCollector.collector)
grpcChannelzCollector.registered = true
}
Comment on lines +485 to +502
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Singleton state can drift if prometheus.MustRegister panics.

initGrpcChannelzCollectorLocked() brings the in-process gRPC server, listener, client, and collector to life, but grpcChannelzCollector.registered is only set to true after prometheus.MustRegister(grpcChannelzCollector.collector) succeeds. MustRegister panics on registration errors (e.g., duplicate descriptor / AlreadyRegisteredError), and the panic will propagate out of RegisterMetrics while the gRPC server goroutine, listener, and client connection are left running with registered == false. There is no defer here to roll them back, and subsequent recovery paths (e.g., re-invoking RegisterMetrics or cleanupGrpcChannelzCollectorForTest) would re-enter while the prior server is still alive.

Consider deferring a rollback if registration doesn't complete, e.g.:

🛡️ Suggested guard
-	if err := initGrpcChannelzCollectorLocked(); err != nil {
-		logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
-		return
-	}
-	if grpcChannelzCollector.registered {
-		return
-	}
-	prometheus.MustRegister(grpcChannelzCollector.collector)
-	grpcChannelzCollector.registered = true
+	if err := initGrpcChannelzCollectorLocked(); err != nil {
+		logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
+		return
+	}
+	if grpcChannelzCollector.registered {
+		return
+	}
+	if err := prometheus.Register(grpcChannelzCollector.collector); err != nil {
+		logutil.BgLogger().Warn("register internal channelz collector failed", zap.Error(err))
+		stopGrpcChannelzCollectorLocked()
+		return
+	}
+	grpcChannelzCollector.registered = true
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/metrics/metrics.go` around lines 485 - 502, The setupChannelzCollector
function can leave the in-process gRPC server/listener/client running if
prometheus.MustRegister panics; after calling initGrpcChannelzCollectorLocked(),
add a deferred rollback that runs unless registration completes: create a local
success flag (or similar) immediately after initGrpcChannelzCollectorLocked()
and defer a function that, when the flag is false, calls the existing cleanup
routine used in tests (e.g., cleanupGrpcChannelzCollectorForTest or a new
cleanupGrpcChannelzCollectorLocked helper) to stop the server, close the
listener/client and reset grpcChannelzCollector state; only set the success flag
(and grpcChannelzCollector.registered = true) after
prometheus.MustRegister(grpcChannelzCollector.collector) returns without panic
so the deferred rollback runs on panic/early return.


// initGrpcChannelzCollectorLocked initializes the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func initGrpcChannelzCollectorLocked() error {
if grpcChannelzCollector.collector != nil {
return nil
}

grpcChannelzCollector.listener = bufconn.Listen(1 << 20)
grpcChannelzCollector.server = grpc.NewServer()
service.RegisterChannelzServiceToServer(grpcChannelzCollector.server)
go func(listener *bufconn.Listener, server *grpc.Server) {
if err := server.Serve(listener); err != nil {
logutil.BgLogger().Warn("internal channelz grpc server stopped", zap.Error(err))
}
}(grpcChannelzCollector.listener, grpcChannelzCollector.server)

listener := grpcChannelzCollector.listener
conn, err := grpc.NewClient(
"passthrough:///bufnet",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return listener.DialContext(ctx)
}),
)
if err != nil {
stopGrpcChannelzCollectorLocked()
return err
}

grpcChannelzCollector.conn = conn
grpcChannelzCollector.collector = tikvcollectors.NewChannelzCollector(conn, channelzCollectorOpts())
return nil
}

func channelzCollectorOpts() tikvcollectors.ChannelzCollectorOpts {
return tikvcollectors.ChannelzCollectorOpts{
Namespace: namespace,
Filter: func(node any) (collect bool, walkChildren bool) {
// Only collect socket and leaf subchannel info, which are more useful for troubleshooting network issues.
switch n := node.(type) {
case *grpc_channelz_v1.Channel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
return false, true

case *grpc_channelz_v1.Subchannel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
isLeaf := len(n.GetSocketRef()) > 0 &&
len(n.GetChannelRef()) == 0 &&
len(n.GetSubchannelRef()) == 0

return isLeaf, true

case *grpc_channelz_v1.Socket:
if isInternalChannelzSocket(n) {
return false, false
}
return true, false

default:
return false, true
}
},
}
}

// isInternalChannelzTarget returns true if the target is used for internal channelz collector, which is identified by
// the fact that its target is "bufnet" or "passthrough:///bufnet".
func isInternalChannelzTarget(target string) bool {
return target == "bufnet" || target == "passthrough:///bufnet"
}

// isInternalChannelzSocket returns true if the socket is created by the internal channelz collector for scrapping
// channelz metrics, which is identified by the fact that it has no remote endpoint.
func isInternalChannelzSocket(socket *grpc_channelz_v1.Socket) bool {
return socket.GetRemote() == nil && socket.GetRemoteName() == ""
}

func cleanupGrpcChannelzCollectorForTest() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

stopGrpcChannelzCollectorLocked()
}

// stopGrpcChannelzCollectorLocked stops and resets the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func stopGrpcChannelzCollectorLocked() {
if grpcChannelzCollector.registered && grpcChannelzCollector.collector != nil {
prometheus.Unregister(grpcChannelzCollector.collector)
}
if grpcChannelzCollector.conn != nil {
_ = grpcChannelzCollector.conn.Close()
}
if grpcChannelzCollector.server != nil {
grpcChannelzCollector.server.Stop()
}
if grpcChannelzCollector.listener != nil {
_ = grpcChannelzCollector.listener.Close()
}

grpcChannelzCollector.server = nil
grpcChannelzCollector.listener = nil
grpcChannelzCollector.conn = nil
grpcChannelzCollector.collector = nil
grpcChannelzCollector.registered = false
}
Loading