diff --git a/br/cmd/br/main_test.go b/br/cmd/br/main_test.go index bdd6f7282ee62..f74d4d3aa1400 100644 --- a/br/cmd/br/main_test.go +++ b/br/cmd/br/main_test.go @@ -49,6 +49,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"), goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), ) } diff --git a/pkg/metrics/BUILD.bazel b/pkg/metrics/BUILD.bazel index 139cd300bf0e4..63f8944438098 100644 --- a/pkg/metrics/BUILD.bazel +++ b/pkg/metrics/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//pkg/metrics/common", "//pkg/parser/terror", "//pkg/timer/metrics", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/promutil", "@com_github_pingcap_errors//:errors", @@ -47,6 +48,12 @@ go_library( "@com_github_prometheus_client_golang//prometheus/collectors", "@com_github_prometheus_client_model//go", "@com_github_tikv_client_go_v2//metrics", + "@com_github_tikv_client_go_v2//util/collectors", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//channelz/grpc_channelz_v1", + "@org_golang_google_grpc//channelz/service", + "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//test/bufconn", "@org_uber_go_zap//:zap", ], ) @@ -61,11 +68,12 @@ go_test( ], embed = [":metrics"], flaky = True, - shard_count = 5, + shard_count = 8, deps = [ "//pkg/parser/terror", "//pkg/statistics/handle/cache", "//pkg/testkit/testsetup", + "//pkg/util/intest", "@com_github_pingcap_errors//:errors", "@com_github_prometheus_client_golang//prometheus", "@com_github_prometheus_client_model//go", diff --git a/pkg/metrics/main_test.go b/pkg/metrics/main_test.go index 8687e2cd2291e..63d0b56c4d99a 100644 --- a/pkg/metrics/main_test.go +++ b/pkg/metrics/main_test.go @@ -29,6 +29,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.Cleanup(func(int) { cleanupGrpcChannelzCollectorForTest() }), } goleak.VerifyTestMain(m, opts...) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 547d2f5bba0b7..70fe7be3209f6 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -15,17 +15,26 @@ package metrics import ( + "context" + "net" "sync" "github.com/pingcap/tidb/pkg/dxf/framework/dxfmetric" "github.com/pingcap/tidb/pkg/ingestor/ingestmetric" metricscommon "github.com/pingcap/tidb/pkg/metrics/common" timermetrics "github.com/pingcap/tidb/pkg/timer/metrics" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" tikvmetrics "github.com/tikv/client-go/v2/metrics" + tikvcollectors "github.com/tikv/client-go/v2/util/collectors" "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/channelz/grpc_channelz_v1" + "google.golang.org/grpc/channelz/service" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" ) var ( @@ -394,6 +403,9 @@ func RegisterMetrics() { // StmtSummary prometheus.MustRegister(StmtSummaryWindowRecordCount) prometheus.MustRegister(StmtSummaryWindowEvictedCount) + + // Channelz + setupChannelzCollector() } // Register registers custom collectors. @@ -458,3 +470,144 @@ func ToggleSimplifiedMode(simplified bool) { } } } + +var grpcChannelzCollector struct { + mu sync.Mutex + + listener *bufconn.Listener + server *grpc.Server + conn *grpc.ClientConn + + collector prometheus.Collector + registered bool +} + +func setupChannelzCollector() { + if intest.InTest { + return + } + + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + if err := initGrpcChannelzCollectorLocked(); err != nil { + logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err)) + return + } + if grpcChannelzCollector.registered { + return + } + prometheus.MustRegister(grpcChannelzCollector.collector) + grpcChannelzCollector.registered = true +} + +// initGrpcChannelzCollectorLocked initializes the singleton channelz collector. +// It must be called with grpcChannelzCollector.mu held. +func initGrpcChannelzCollectorLocked() error { + if grpcChannelzCollector.collector != nil { + return nil + } + + grpcChannelzCollector.listener = bufconn.Listen(1 << 20) + grpcChannelzCollector.server = grpc.NewServer() + service.RegisterChannelzServiceToServer(grpcChannelzCollector.server) + go func(listener *bufconn.Listener, server *grpc.Server) { + if err := server.Serve(listener); err != nil { + logutil.BgLogger().Warn("internal channelz grpc server stopped", zap.Error(err)) + } + }(grpcChannelzCollector.listener, grpcChannelzCollector.server) + + listener := grpcChannelzCollector.listener + conn, err := grpc.NewClient( + "passthrough:///bufnet", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return listener.DialContext(ctx) + }), + ) + if err != nil { + stopGrpcChannelzCollectorLocked() + return err + } + + grpcChannelzCollector.conn = conn + grpcChannelzCollector.collector = tikvcollectors.NewChannelzCollector(conn, channelzCollectorOpts()) + return nil +} + +func channelzCollectorOpts() tikvcollectors.ChannelzCollectorOpts { + return tikvcollectors.ChannelzCollectorOpts{ + Namespace: namespace, + Filter: func(node any) (collect bool, walkChildren bool) { + // Only collect socket and leaf subchannel info, which are more useful for troubleshooting network issues. + switch n := node.(type) { + case *grpc_channelz_v1.Channel: + if isInternalChannelzTarget(n.GetData().GetTarget()) { + return false, false + } + return false, true + + case *grpc_channelz_v1.Subchannel: + if isInternalChannelzTarget(n.GetData().GetTarget()) { + return false, false + } + isLeaf := len(n.GetSocketRef()) > 0 && + len(n.GetChannelRef()) == 0 && + len(n.GetSubchannelRef()) == 0 + + return isLeaf, true + + case *grpc_channelz_v1.Socket: + if isInternalChannelzSocket(n) { + return false, false + } + return true, false + + default: + return false, true + } + }, + } +} + +// isInternalChannelzTarget returns true if the target is used for internal channelz collector, which is identified by +// the fact that its target is "bufnet" or "passthrough:///bufnet". +func isInternalChannelzTarget(target string) bool { + return target == "bufnet" || target == "passthrough:///bufnet" +} + +// isInternalChannelzSocket returns true if the socket is created by the internal channelz collector for scrapping +// channelz metrics, which is identified by the fact that it has no remote endpoint. +func isInternalChannelzSocket(socket *grpc_channelz_v1.Socket) bool { + return socket.GetRemote() == nil && socket.GetRemoteName() == "" +} + +func cleanupGrpcChannelzCollectorForTest() { + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + stopGrpcChannelzCollectorLocked() +} + +// stopGrpcChannelzCollectorLocked stops and resets the singleton channelz collector. +// It must be called with grpcChannelzCollector.mu held. +func stopGrpcChannelzCollectorLocked() { + if grpcChannelzCollector.registered && grpcChannelzCollector.collector != nil { + prometheus.Unregister(grpcChannelzCollector.collector) + } + if grpcChannelzCollector.conn != nil { + _ = grpcChannelzCollector.conn.Close() + } + if grpcChannelzCollector.server != nil { + grpcChannelzCollector.server.Stop() + } + if grpcChannelzCollector.listener != nil { + _ = grpcChannelzCollector.listener.Close() + } + + grpcChannelzCollector.server = nil + grpcChannelzCollector.listener = nil + grpcChannelzCollector.conn = nil + grpcChannelzCollector.collector = nil + grpcChannelzCollector.registered = false +} diff --git a/pkg/metrics/metrics_internal_test.go b/pkg/metrics/metrics_internal_test.go index 2c5d5a4203d59..533ab770e3954 100644 --- a/pkg/metrics/metrics_internal_test.go +++ b/pkg/metrics/metrics_internal_test.go @@ -15,9 +15,11 @@ package metrics import ( + "strings" "testing" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -64,3 +66,102 @@ func TestStmtSummaryMetricLabels(t *testing.T) { require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2))) require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2))) } + +func TestGrpcChannelzCollectorSingleton(t *testing.T) { + cleanupGrpcChannelzCollectorForTest() + t.Cleanup(cleanupGrpcChannelzCollectorForTest) + + func() { + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + require.NoError(t, initGrpcChannelzCollectorLocked()) + firstServer := grpcChannelzCollector.server + firstListener := grpcChannelzCollector.listener + firstConn := grpcChannelzCollector.conn + firstCollector := grpcChannelzCollector.collector + + require.NoError(t, initGrpcChannelzCollectorLocked()) + require.Same(t, firstServer, grpcChannelzCollector.server) + require.Same(t, firstListener, grpcChannelzCollector.listener) + require.Same(t, firstConn, grpcChannelzCollector.conn) + require.True(t, firstCollector == grpcChannelzCollector.collector) + }() + + cleanupGrpcChannelzCollectorForTest() + + func() { + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + require.Nil(t, grpcChannelzCollector.server) + require.Nil(t, grpcChannelzCollector.listener) + require.Nil(t, grpcChannelzCollector.conn) + require.Nil(t, grpcChannelzCollector.collector) + require.False(t, grpcChannelzCollector.registered) + }() +} + +func TestSetupChannelzCollectorSkippedInTest(t *testing.T) { + cleanupGrpcChannelzCollectorForTest() + t.Cleanup(cleanupGrpcChannelzCollectorForTest) + require.True(t, intest.InTest) + + setupChannelzCollector() + + func() { + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + require.Nil(t, grpcChannelzCollector.collector) + require.False(t, grpcChannelzCollector.registered) + }() +} + +func TestGrpcChannelzCollectorGather(t *testing.T) { + cleanupGrpcChannelzCollectorForTest() + t.Cleanup(cleanupGrpcChannelzCollectorForTest) + + var collector prometheus.Collector + func() { + grpcChannelzCollector.mu.Lock() + defer grpcChannelzCollector.mu.Unlock() + + require.NoError(t, initGrpcChannelzCollectorLocked()) + collector = grpcChannelzCollector.collector + }() + + registry := prometheus.NewRegistry() + require.NoError(t, registry.Register(collector)) + families, err := registry.Gather() + require.NoError(t, err) + + require.NotNil(t, findMetricFamily(families, "tidb_grpc_channelz_fetch_errors_total")) + for _, family := range families { + for _, metric := range family.GetMetric() { + require.False(t, metricHasLabelValue(metric, "target", "bufnet")) + require.False(t, metricHasLabelValue(metric, "target", "passthrough:///bufnet")) + if strings.HasPrefix(family.GetName(), "tidb_grpc_channelz_socket_") { + require.False(t, metricHasLabelValue(metric, "remote", "")) + } + } + } +} + +func findMetricFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily { + for _, family := range families { + if family.GetName() == name { + return family + } + } + return nil +} + +func metricHasLabelValue(metric *dto.Metric, name string, value string) bool { + for _, label := range metric.GetLabel() { + if label.GetName() == name && label.GetValue() == value { + return true + } + } + return false +} diff --git a/pkg/server/handler/extractorhandler/main_test.go b/pkg/server/handler/extractorhandler/main_test.go index 7a64e8f2fc0ea..095f9372b2824 100644 --- a/pkg/server/handler/extractorhandler/main_test.go +++ b/pkg/server/handler/extractorhandler/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/handler/optimizor/main_test.go b/pkg/server/handler/optimizor/main_test.go index b3220444e2d90..a54b1b640aeec 100644 --- a/pkg/server/handler/optimizor/main_test.go +++ b/pkg/server/handler/optimizor/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/handler/tests/main_test.go b/pkg/server/handler/tests/main_test.go index f35bdc9779f8e..e08f866d77685 100644 --- a/pkg/server/handler/tests/main_test.go +++ b/pkg/server/handler/tests/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/main_test.go b/pkg/server/main_test.go index 8f68148f70ed1..ca0daca7c9439 100644 --- a/pkg/server/main_test.go +++ b/pkg/server/main_test.go @@ -78,6 +78,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"), goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), } diff --git a/pkg/server/tests/commontest/main_test.go b/pkg/server/tests/commontest/main_test.go index 94d058bd8dc86..3b13b2d9f7c4e 100644 --- a/pkg/server/tests/commontest/main_test.go +++ b/pkg/server/tests/commontest/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/tests/cursor/main_test.go b/pkg/server/tests/cursor/main_test.go index 0e6b15755fbd5..c34aa9b80e117 100644 --- a/pkg/server/tests/cursor/main_test.go +++ b/pkg/server/tests/cursor/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/tests/main_test.go b/pkg/server/tests/main_test.go index e4dd64b449d6a..40ff52fc5e4bd 100644 --- a/pkg/server/tests/main_test.go +++ b/pkg/server/tests/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/tests/standby/main_test.go b/pkg/server/tests/standby/main_test.go index 83ba9fcc276a6..6b420333d12b4 100644 --- a/pkg/server/tests/standby/main_test.go +++ b/pkg/server/tests/standby/main_test.go @@ -69,6 +69,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...) diff --git a/pkg/server/tests/tls/main_test.go b/pkg/server/tests/tls/main_test.go index 7d87e42a738aa..9f445ffb39261 100644 --- a/pkg/server/tests/tls/main_test.go +++ b/pkg/server/tests/tls/main_test.go @@ -64,6 +64,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"), + goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"), } goleak.VerifyTestMain(m, opts...)