Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions br/cmd/br/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ go_library(
"//pkg/metrics/common",
"//pkg/parser/terror",
"//pkg/timer/metrics",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_prometheus_client_model//go",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//util/collectors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//channelz/grpc_channelz_v1",
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//test/bufconn",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -61,11 +68,12 @@ go_test(
],
embed = [":metrics"],
flaky = True,
shard_count = 5,
shard_count = 8,
deps = [
"//pkg/parser/terror",
"//pkg/statistics/handle/cache",
"//pkg/testkit/testsetup",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.Cleanup(func(int) { cleanupGrpcChannelzCollectorForTest() }),
}
goleak.VerifyTestMain(m, opts...)
}
153 changes: 153 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
package metrics

import (
"context"
"net"
"sync"

"github.com/pingcap/tidb/pkg/dxf/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/ingestor/ingestmetric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
timermetrics "github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
tikvcollectors "github.com/tikv/client-go/v2/util/collectors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

var (
Expand Down Expand Up @@ -394,6 +403,9 @@ func RegisterMetrics() {
// StmtSummary
prometheus.MustRegister(StmtSummaryWindowRecordCount)
prometheus.MustRegister(StmtSummaryWindowEvictedCount)

// Channelz
setupChannelzCollector()
}

// Register registers custom collectors.
Expand Down Expand Up @@ -458,3 +470,144 @@ func ToggleSimplifiedMode(simplified bool) {
}
}
}

var grpcChannelzCollector struct {
mu sync.Mutex

listener *bufconn.Listener
server *grpc.Server
conn *grpc.ClientConn

collector prometheus.Collector
registered bool
}

func setupChannelzCollector() {
if intest.InTest {
return
}

grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

if err := initGrpcChannelzCollectorLocked(); err != nil {
logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
return
}
if grpcChannelzCollector.registered {
return
}
prometheus.MustRegister(grpcChannelzCollector.collector)
grpcChannelzCollector.registered = true
}
Comment on lines +485 to +502
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Singleton state can drift if prometheus.MustRegister panics.

initGrpcChannelzCollectorLocked() brings the in-process gRPC server, listener, client, and collector to life, but grpcChannelzCollector.registered is only set to true after prometheus.MustRegister(grpcChannelzCollector.collector) succeeds. MustRegister panics on registration errors (e.g., duplicate descriptor / AlreadyRegisteredError), and the panic will propagate out of RegisterMetrics while the gRPC server goroutine, listener, and client connection are left running with registered == false. There is no defer here to roll them back, and subsequent recovery paths (e.g., re-invoking RegisterMetrics or cleanupGrpcChannelzCollectorForTest) would re-enter while the prior server is still alive.

Consider deferring a rollback if registration doesn't complete, e.g.:

🛡️ Suggested guard
-	if err := initGrpcChannelzCollectorLocked(); err != nil {
-		logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
-		return
-	}
-	if grpcChannelzCollector.registered {
-		return
-	}
-	prometheus.MustRegister(grpcChannelzCollector.collector)
-	grpcChannelzCollector.registered = true
+	if err := initGrpcChannelzCollectorLocked(); err != nil {
+		logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
+		return
+	}
+	if grpcChannelzCollector.registered {
+		return
+	}
+	if err := prometheus.Register(grpcChannelzCollector.collector); err != nil {
+		logutil.BgLogger().Warn("register internal channelz collector failed", zap.Error(err))
+		stopGrpcChannelzCollectorLocked()
+		return
+	}
+	grpcChannelzCollector.registered = true
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/metrics/metrics.go` around lines 485 - 502, The setupChannelzCollector
function can leave the in-process gRPC server/listener/client running if
prometheus.MustRegister panics; after calling initGrpcChannelzCollectorLocked(),
add a deferred rollback that runs unless registration completes: create a local
success flag (or similar) immediately after initGrpcChannelzCollectorLocked()
and defer a function that, when the flag is false, calls the existing cleanup
routine used in tests (e.g., cleanupGrpcChannelzCollectorForTest or a new
cleanupGrpcChannelzCollectorLocked helper) to stop the server, close the
listener/client and reset grpcChannelzCollector state; only set the success flag
(and grpcChannelzCollector.registered = true) after
prometheus.MustRegister(grpcChannelzCollector.collector) returns without panic
so the deferred rollback runs on panic/early return.


// initGrpcChannelzCollectorLocked initializes the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func initGrpcChannelzCollectorLocked() error {
if grpcChannelzCollector.collector != nil {
return nil
}

grpcChannelzCollector.listener = bufconn.Listen(1 << 20)
grpcChannelzCollector.server = grpc.NewServer()
service.RegisterChannelzServiceToServer(grpcChannelzCollector.server)
go func(listener *bufconn.Listener, server *grpc.Server) {
if err := server.Serve(listener); err != nil {
logutil.BgLogger().Warn("internal channelz grpc server stopped", zap.Error(err))
}
}(grpcChannelzCollector.listener, grpcChannelzCollector.server)

listener := grpcChannelzCollector.listener
conn, err := grpc.NewClient(
"passthrough:///bufnet",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return listener.DialContext(ctx)
}),
)
if err != nil {
stopGrpcChannelzCollectorLocked()
return err
}

grpcChannelzCollector.conn = conn
grpcChannelzCollector.collector = tikvcollectors.NewChannelzCollector(conn, channelzCollectorOpts())
return nil
}

func channelzCollectorOpts() tikvcollectors.ChannelzCollectorOpts {
return tikvcollectors.ChannelzCollectorOpts{
Namespace: namespace,
Filter: func(node any) (collect bool, walkChildren bool) {
// Only collect socket and leaf subchannel info, which are more useful for troubleshooting network issues.
switch n := node.(type) {
case *grpc_channelz_v1.Channel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
return false, true

case *grpc_channelz_v1.Subchannel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
isLeaf := len(n.GetSocketRef()) > 0 &&
len(n.GetChannelRef()) == 0 &&
len(n.GetSubchannelRef()) == 0

return isLeaf, true

case *grpc_channelz_v1.Socket:
if isInternalChannelzSocket(n) {
return false, false
}
return true, false

default:
return false, true
}
},
}
}

// isInternalChannelzTarget returns true if the target is used for internal channelz collector, which is identified by
// the fact that its target is "bufnet" or "passthrough:///bufnet".
func isInternalChannelzTarget(target string) bool {
return target == "bufnet" || target == "passthrough:///bufnet"
}

// isInternalChannelzSocket returns true if the socket is created by the internal channelz collector for scrapping
// channelz metrics, which is identified by the fact that it has no remote endpoint.
func isInternalChannelzSocket(socket *grpc_channelz_v1.Socket) bool {
return socket.GetRemote() == nil && socket.GetRemoteName() == ""
}

func cleanupGrpcChannelzCollectorForTest() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

stopGrpcChannelzCollectorLocked()
}

// stopGrpcChannelzCollectorLocked stops and resets the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func stopGrpcChannelzCollectorLocked() {
if grpcChannelzCollector.registered && grpcChannelzCollector.collector != nil {
prometheus.Unregister(grpcChannelzCollector.collector)
}
if grpcChannelzCollector.conn != nil {
_ = grpcChannelzCollector.conn.Close()
}
if grpcChannelzCollector.server != nil {
grpcChannelzCollector.server.Stop()
}
if grpcChannelzCollector.listener != nil {
_ = grpcChannelzCollector.listener.Close()
}

grpcChannelzCollector.server = nil
grpcChannelzCollector.listener = nil
grpcChannelzCollector.conn = nil
grpcChannelzCollector.collector = nil
grpcChannelzCollector.registered = false
}
101 changes: 101 additions & 0 deletions pkg/metrics/metrics_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package metrics

import (
"strings"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,3 +66,102 @@ func TestStmtSummaryMetricLabels(t *testing.T) {
require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2)))
require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2)))
}

func TestGrpcChannelzCollectorSingleton(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
firstServer := grpcChannelzCollector.server
firstListener := grpcChannelzCollector.listener
firstConn := grpcChannelzCollector.conn
firstCollector := grpcChannelzCollector.collector

require.NoError(t, initGrpcChannelzCollectorLocked())
require.Same(t, firstServer, grpcChannelzCollector.server)
require.Same(t, firstListener, grpcChannelzCollector.listener)
require.Same(t, firstConn, grpcChannelzCollector.conn)
require.True(t, firstCollector == grpcChannelzCollector.collector)
}()

cleanupGrpcChannelzCollectorForTest()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.server)
require.Nil(t, grpcChannelzCollector.listener)
require.Nil(t, grpcChannelzCollector.conn)
require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestSetupChannelzCollectorSkippedInTest(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)
require.True(t, intest.InTest)

setupChannelzCollector()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestGrpcChannelzCollectorGather(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

var collector prometheus.Collector
func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
collector = grpcChannelzCollector.collector
}()

registry := prometheus.NewRegistry()
require.NoError(t, registry.Register(collector))
families, err := registry.Gather()
require.NoError(t, err)

require.NotNil(t, findMetricFamily(families, "tidb_grpc_channelz_fetch_errors_total"))
for _, family := range families {
for _, metric := range family.GetMetric() {
require.False(t, metricHasLabelValue(metric, "target", "bufnet"))
require.False(t, metricHasLabelValue(metric, "target", "passthrough:///bufnet"))
if strings.HasPrefix(family.GetName(), "tidb_grpc_channelz_socket_") {
require.False(t, metricHasLabelValue(metric, "remote", ""))
}
}
}
}

func findMetricFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily {
for _, family := range families {
if family.GetName() == name {
return family
}
}
return nil
}

func metricHasLabelValue(metric *dto.Metric, name string, value string) bool {
for _, label := range metric.GetLabel() {
if label.GetName() == name && label.GetValue() == value {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions pkg/server/handler/extractorhandler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/handler/optimizor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/handler/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/server/tests/commontest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
Loading