Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6530,13 +6530,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "a16a464c372f295061deaf6134a9e26602ebf780beb9d7d3324fb8d1e333eace",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20260326084500-678ff92b1edd",
sha256 = "dcd969a369b184787231d8b6600490c23bdd327e3d405da6a6c309bf5ea3b4b1",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20260408021215-335c5c64af53",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260326084500-678ff92b1edd.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260326084500-678ff92b1edd.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260326084500-678ff92b1edd.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260326084500-678ff92b1edd.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260408021215-335c5c64af53.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260408021215-335c5c64af53.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260408021215-335c5c64af53.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20260408021215-335c5c64af53.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7805,13 +7805,13 @@ def go_deps():
build_tags = ["nextgen", "intest"],
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "46ee8c64e0f95ad1514e824506e49ea1d9f75329ee6419aae4b0817336550557",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260401083018-b7f9a9e9d2ab",
sha256 = "4931d629046c60e96119106a907e473e847fef5e2d15a65253f39e41a95b013a",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260415115642-0eed1ff3c43e",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260415115642-0eed1ff3c43e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260415115642-0eed1ff3c43e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260415115642-0eed1ff3c43e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260415115642-0eed1ff3c43e.zip",
],
)
go_repository(
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20260310054046-9c8b3586e4b2
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9
github.com/pingcap/metering_sdk v0.0.0-20260324055927-14fead745f1d
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
Expand All @@ -124,7 +124,7 @@ require (
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.11.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab
github.com/tikv/client-go/v2 v2.0.8-0.20260415115642-0eed1ff3c43e
github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67
github.com/twmb/murmur3 v1.1.6
Expand Down Expand Up @@ -353,7 +353,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/protobuf v1.36.10
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apimachinery v0.29.11 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd h1:FA2DzGly3tuBWFjktkJxmqeOVEqgrsUvKMQXAw9xvWE=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53 h1:wjhJRzyeRKpJqMg6XmqQ7cJdhEhE5mSoCh94rWdTVOk=
github.com/pingcap/kvproto v0.0.0-20260408021215-335c5c64af53/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
Expand Down Expand Up @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab h1:t9Kh7tVsSSUMuSPpHChPF6W3VtN4Kq7Gi8EgWPbYRyY=
github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab/go.mod h1:lfRxHwyBp1rjTmNC04SUZ+dqk7i1R1AeJ2zraMQaNvY=
github.com/tikv/client-go/v2 v2.0.8-0.20260415115642-0eed1ff3c43e h1:YvslQEfuAbak6ube/LDKsDuG3qwYpvUpzYxVZI37GWU=
github.com/tikv/client-go/v2 v2.0.8-0.20260415115642-0eed1ff3c43e/go.mod h1:rg9c3yf9lfQdj9rt5FvwRP9xDubUY6C9hyH4g9zFH1s=
github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52 h1:fXIMowblD3qdfHXJYGJpe7SbBlTO4S9GPVZZvL3CPG8=
github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52/go.mod h1:I2yRx/Yf8Y8kgM5f3VNp4a8fWpnjPC4TxWk554AY8bM=
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk=
Expand Down
1 change: 1 addition & 0 deletions pkg/importsdk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_test(
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/lightning/mydump",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/util/table-filter",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Expand Down
10 changes: 9 additions & 1 deletion pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ go_library(
"//pkg/metrics/common",
"//pkg/parser/terror",
"//pkg/timer/metrics",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_prometheus_client_model//go",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//util/collectors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//channelz/grpc_channelz_v1",
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//test/bufconn",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -61,11 +68,12 @@ go_test(
],
embed = [":metrics"],
flaky = True,
shard_count = 5,
shard_count = 8,
deps = [
"//pkg/parser/terror",
"//pkg/statistics/handle/cache",
"//pkg/testkit/testsetup",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
Expand Down
3 changes: 3 additions & 0 deletions pkg/metrics/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.Cleanup(func(int) {
cleanupGrpcChannelzCollectorForTest()
}),
}
goleak.VerifyTestMain(m, opts...)
}
133 changes: 133 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
package metrics

import (
"context"
"net"
"sync"

"github.com/pingcap/tidb/pkg/dxf/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/ingestor/ingestmetric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
timermetrics "github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
tikvcollectors "github.com/tikv/client-go/v2/util/collectors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

var (
Expand Down Expand Up @@ -394,6 +403,9 @@ func RegisterMetrics() {
// StmtSummary
prometheus.MustRegister(StmtSummaryWindowRecordCount)
prometheus.MustRegister(StmtSummaryWindowEvictedCount)

// Channelz
setupChannelzCollector()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would other goleak-based suites calling metrics.RegisterMetrics() directly get go leak check error in some caces?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved by c33b6e1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case some of integration tests which call RegisterMetrics but are NOT compiled with -tags=intest, a6303d8 added the goroutine to the goleak whitelist.

}

// Register registers custom collectors.
Expand Down Expand Up @@ -458,3 +470,124 @@ func ToggleSimplifiedMode(simplified bool) {
}
}
}

var grpcChannelzCollector struct {
mu sync.Mutex

listener *bufconn.Listener
server *grpc.Server
conn *grpc.ClientConn

collector prometheus.Collector
registered bool
}

func setupChannelzCollector() {
if intest.InTest {
return
}

grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

if err := initGrpcChannelzCollectorLocked(); err != nil {
logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
return
}
if grpcChannelzCollector.registered {
return
}
prometheus.MustRegister(grpcChannelzCollector.collector)
grpcChannelzCollector.registered = true
}

// initGrpcChannelzCollectorLocked initializes the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func initGrpcChannelzCollectorLocked() error {
if grpcChannelzCollector.collector != nil {
return nil
}

grpcChannelzCollector.listener = bufconn.Listen(1 << 20)
grpcChannelzCollector.server = grpc.NewServer()
service.RegisterChannelzServiceToServer(grpcChannelzCollector.server)
go func(listener *bufconn.Listener, server *grpc.Server) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is graceful shutdown of tidb needs to be considered here to close this background thread properly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, it's just for collecting channelz data and won't block graceful shutdown.

if err := server.Serve(listener); err != nil {
logutil.BgLogger().Warn("internal channelz grpc server stopped", zap.Error(err))
}
}(grpcChannelzCollector.listener, grpcChannelzCollector.server)

listener := grpcChannelzCollector.listener
conn, err := grpc.NewClient(
"passthrough:///bufnet",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return listener.DialContext(ctx)
}),
)
if err != nil {
stopGrpcChannelzCollectorLocked()
return err
}

grpcChannelzCollector.conn = conn
grpcChannelzCollector.collector = tikvcollectors.NewChannelzCollector(conn, channelzCollectorOpts())
return nil
}

func channelzCollectorOpts() tikvcollectors.ChannelzCollectorOpts {
return tikvcollectors.ChannelzCollectorOpts{
Namespace: namespace,
DisableLocalLabel: true,
Filter: func(node any) (collect bool, walkChildren bool) {
Comment on lines +538 to +541
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter seems to include the collector’s own internal bufnet connection, so scraping
may inflate tidb_grpc_channelz_* by itself. Should we exclude it here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved by c5f3b5b

// Only collect socket and leaf subchannel info, which are more useful for troubleshooting network issues.
switch n := node.(type) {
case *grpc_channelz_v1.Socket:
return true, false

case *grpc_channelz_v1.Subchannel:
isLeaf := len(n.GetSocketRef()) > 0 &&
len(n.GetChannelRef()) == 0 &&
len(n.GetSubchannelRef()) == 0

if isLeaf {
return true, true
}
return false, true

default:
return false, true
}
},
}
}

func cleanupGrpcChannelzCollectorForTest() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

stopGrpcChannelzCollectorLocked()
}

// stopGrpcChannelzCollectorLocked stops and resets the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func stopGrpcChannelzCollectorLocked() {
if grpcChannelzCollector.registered && grpcChannelzCollector.collector != nil {
prometheus.Unregister(grpcChannelzCollector.collector)
}
if grpcChannelzCollector.conn != nil {
_ = grpcChannelzCollector.conn.Close()
}
if grpcChannelzCollector.server != nil {
grpcChannelzCollector.server.Stop()
}
if grpcChannelzCollector.listener != nil {
_ = grpcChannelzCollector.listener.Close()
}

grpcChannelzCollector.server = nil
grpcChannelzCollector.listener = nil
grpcChannelzCollector.conn = nil
grpcChannelzCollector.collector = nil
grpcChannelzCollector.registered = false
}
82 changes: 82 additions & 0 deletions pkg/metrics/metrics_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,3 +65,84 @@ func TestStmtSummaryMetricLabels(t *testing.T) {
require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2)))
require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2)))
}

func TestGrpcChannelzCollectorSingleton(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
firstServer := grpcChannelzCollector.server
firstListener := grpcChannelzCollector.listener
firstConn := grpcChannelzCollector.conn
firstCollector := grpcChannelzCollector.collector

require.NoError(t, initGrpcChannelzCollectorLocked())
require.Same(t, firstServer, grpcChannelzCollector.server)
require.Same(t, firstListener, grpcChannelzCollector.listener)
require.Same(t, firstConn, grpcChannelzCollector.conn)
require.True(t, firstCollector == grpcChannelzCollector.collector)
}()

cleanupGrpcChannelzCollectorForTest()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.server)
require.Nil(t, grpcChannelzCollector.listener)
require.Nil(t, grpcChannelzCollector.conn)
require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestSetupChannelzCollectorSkippedInTest(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)
require.True(t, intest.InTest)

setupChannelzCollector()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestGrpcChannelzCollectorGather(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

var collector prometheus.Collector
func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
collector = grpcChannelzCollector.collector
}()

registry := prometheus.NewRegistry()
require.NoError(t, registry.Register(collector))
families, err := registry.Gather()
require.NoError(t, err)

require.NotNil(t, findMetricFamily(families, "tidb_grpc_channelz_fetch_errors_total"))
}

func findMetricFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily {
for _, family := range families {
if family.GetName() == name {
return family
}
}
return nil
}
Loading