Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/metaservice/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "metaservice",
srcs = [
"etcd.go",
"metamanager.go",
],
importpath = "github.com/pingcap/tidb/pkg/metaservice",
visibility = ["//visibility:public"],
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "metaservice_test",
timeout = "short",
srcs = [
"etcd_test.go",
"metamanager_test.go",
],
embed = [":metaservice"],
flaky = True,
shard_count = 4,
deps = [
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_tests_v3//integration",
],
)
136 changes: 136 additions & 0 deletions pkg/metaservice/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice

import (
"context"
"errors"
"fmt"
"net"
"net/url"

"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
)

const getAllMembersBackoff = 5000

// client is used to implement etcd meta service.
type client struct {
pdCli pd.Client
keyspaceEtcdCli *clientv3.Client
}

// newClient is used to implement etcd meta service.
func newClient(etcdCli *clientv3.Client, pdCli pd.Client) ServiceClient {
if etcdCli == nil {
return nil
}
return &client{
keyspaceEtcdCli: etcdCli,
pdCli: pdCli,
}
}

// GetKeyspaceEtcdCli return etcd client.
func (n *client) GetKeyspaceEtcdCli() *clientv3.Client {
return n.keyspaceEtcdCli
}

// GetPDAddrs implements ServiceClient interface.
func (n *client) GetPDAddrs(ctx context.Context) ([]string, error) {
addrs, err := GetPDAddrs(ctx, n.pdCli, false)
if err != nil {
return nil, err
}
return addrs, err
}

// GetPDAddrs returns the PD addresses from PD client.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [Major] PD member endpoint extraction now has two canonical implementations

Why
The new metaservice package reimplements PD member retry and ClientUrls parsing instead of sharing the existing TiKV store path that already derives client endpoints from the same PD members.

Scope
pkg/metaservice/etcd.go:62 and pkg/store/driver/tikv_driver.go:300

Risk if unchanged
The two paths can drift on retry context, config override behavior, URL validation, scheme handling, and empty-member behavior. A future meta-service caller may get different endpoints than the existing store helper for the same PD members.

Evidence
GetPDHostPorts builds a tikv Backoffer, calls pdClient.GetAllMembers, parses member.ClientUrls[0], and appends host or host-with-scheme. tikvStore.EtcdAddrs already builds a Backoffer, calls s.GetPDClient().GetAllMembers, parses the same member.ClientUrls[0], and appends u.Host.

Change request
Can we keep one canonical helper for PD member endpoint extraction and have both call sites use it?

func GetPDAddrs(ctx context.Context, pdClient pd.Client, withSchema bool) ([]string, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [Minor] Address helper name hides whether it returns host-ports or URLs

Why
The helper is named GetPDHostPorts, but the withSchema flag changes the result from bare host:port values into URL strings with http:// or https:// prefixes. The flag name also says schema, which is an overloaded TiDB domain term and is not the URL component being controlled.

Scope
pkg/metaservice/etcd.go:63

Risk if unchanged
Callers can pass the boolean with the wrong expectation and route a URL where a host-port is required, or the reverse, because the name does not make the contract visible at call sites.

Evidence
GetPDAddrs calls GetPDHostPorts(ctx, n.pdCli, false) for host:port, while GetPDHttpAddrs calls GetPDHostPorts(ctx, n.pdCli, true) and line 87 prepends the parsed URL prefix. Existing TiDB naming uses scheme for this concept, for example GetPDsAddrWithoutScheme in pkg/util/util.go:344.

Change request
Prefer splitting the helper by return shape, or rename the flag to includeScheme/withScheme and update the function/comment so the public contract clearly says whether it returns bare host-ports or URLs.

pdAddrs := make([]string, 0)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
if pdClient == nil {
return nil, errors.New("PD client not found")
}
for {
members, err := pdClient.GetAllMembers(ctx)
if err != nil {
err := bo.Backoff(tikv.BoRegionMiss(), err)
if err != nil {
return nil, err
}
continue
}
for _, member := range members.GetMembers() {
Comment thread
D3Hunter marked this conversation as resolved.
if len(member.ClientUrls) > 0 {
prefix, hostPort, err := ParseURL(member.ClientUrls[0])
if err != nil {
return nil, fmt.Errorf("parse client url from pd members %q: %w", member.ClientUrls[0], err)
}
var pdAddr string
if withSchema {
pdAddr = prefix + hostPort // http://ip:port
} else {
pdAddr = hostPort // ip:port
}

pdAddrs = append(pdAddrs, pdAddr)
}
}
if len(pdAddrs) == 0 {
return nil, errors.New("no usable PD client URL found in PD members")
}
return pdAddrs, nil
Comment thread
ystaticy marked this conversation as resolved.
}
}

// ParseURL parses the given URL to get the host:port.
func ParseURL(rawURL string) (prefix string, hostPort string, err error) {
u, parseErr := url.Parse(rawURL)
if parseErr != nil {
return "", "", invalidURLHostPortErr()
}

switch u.Scheme {
case "http":
prefix = "http://"
case "https":
prefix = "https://"
default:
return "", "", fmt.Errorf("invalid URL prefix")
}

host, port, splitErr := net.SplitHostPort(u.Host)
if splitErr != nil || host == "" || port == "" {
return "", "", invalidURLHostPortErr()
}

return prefix, u.Host, nil
}

func invalidURLHostPortErr() error {
return fmt.Errorf("invalid URL format, expect host:port")
}

// GetPDHttpAddrs is used to get PD http addrs.
func (n *client) GetPDHttpAddrs(ctx context.Context) ([]string, error) {
addrs, err := GetPDAddrs(ctx, n.pdCli, true)
if err != nil {
return nil, err
}
return addrs, err
}
132 changes: 132 additions & 0 deletions pkg/metaservice/etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice

import (
"context"
"net"
"runtime"
"testing"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/tests/v3/integration"
)

type mockPDClient struct {
pd.Client
members []*pdpb.Member
}

func (c *mockPDClient) GetAllMembers(context.Context) (*pdpb.GetMembersResponse, error) {
return &pdpb.GetMembersResponse{Members: c.members}, nil
}

// ETCD use ip:port as unix socket address, however this address is invalid on windows.
// We have to skip some of the test in such case.
// https://github.com/etcd-io/etcd/blob/f0faa5501d936cd8c9f561bb9d1baca70eb67ab1/pkg/types/urls.go#L42
func unixSocketAvailable() bool {
c, err := net.Listen("unix", "127.0.0.1:0")
if err == nil {
_ = c.Close()
return true
}
return false
}

// TestGetPDAddrsWithRealClient tests the GetPDAddrs method with a real etcd client
func TestGetPDAddrsWithRealClient(t *testing.T) {
integration.BeforeTestExternal(t)
if runtime.GOOS == "windows" {
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
}

// Initialize etcd client
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Comment thread
D3Hunter marked this conversation as resolved.
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

expectAddrs := []string{"127.0.0.1:1111"}
pdCli := &mockPDClient{
members: []*pdpb.Member{{
ClientUrls: []string{"http://127.0.0.1:1111"},
}},
}

serviceClient := newClient(etcdCli, pdCli)
addrs, err := serviceClient.GetPDAddrs(context.Background())
require.NoError(t, err)
require.Equal(t, expectAddrs, addrs)

t.Run("empty client urls returns error", func(t *testing.T) {
pdCli := &mockPDClient{
members: []*pdpb.Member{
{},
{ClientUrls: []string{}},
},
}

serviceClient := newClient(etcdCli, pdCli)
addrs, err := serviceClient.GetPDAddrs(context.Background())
require.Error(t, err)
require.Nil(t, addrs)
require.EqualError(t, err, "no usable PD client URL found in PD members")
})
}

// TestParseURL tests the ParseURL function with various inputs.
func TestParseURL(t *testing.T) {
tests := []struct {
rawURL string
prefix string
hostPort string
err bool
}{
// Successful test cases
{"http://example.com:8080", "http://", "example.com:8080", false},
{"https://localhost:443", "https://", "localhost:443", false},
{"http://[2001:db8::1]:2379", "http://", "[2001:db8::1]:2379", false},
{"https://[2001:db8::1]:443", "https://", "[2001:db8::1]:443", false},

// Unsuccessful test cases
{"ftp://example.com", "ftp://", "", true}, // Invalid prefix
{"unix://localhost:m0", "unix://", "", true}, // Unix schema is unsupported
{"unix://localhost", "unix://", "", true}, // Unix schema is unsupported
{"http://example.com:8080:extra", "http://", "", true}, // Extra part after port
{"https://:8080", "https://", "", true}, // Missing host
{"http://", "http://", "", true}, // Incomplete URL
{"https://example.com", "https://", "", true}, // Missing port
{"http://localhost", "http://", "", true}, // Missing port
{"https://[2001:db8::1]", "https://", "", true}, // Missing port
{"http://2001:db8::1:2379", "http://", "", true}, // Unbracketed IPv6 with port
{"https://[2001:db8::1", "https://", "", true}, // Invalid bracketed IPv6
}

for _, test := range tests {
prefix, hostPort, err := ParseURL(test.rawURL)

// Check if the error status matches the expectation
if test.err {
require.Error(t, err, "Expected an error for input: "+test.rawURL)
require.Empty(t, prefix, "Expected an error for input: "+test.rawURL)
require.Empty(t, hostPort, "hostPort should be empty for input: "+test.rawURL)
} else {
require.NoError(t, err, "Did not expect an error for input: "+test.rawURL)
require.Equal(t, test.prefix, prefix, "prefix mismatch for input: "+test.rawURL)
require.Equal(t, test.hostPort, hostPort, "hostPort mismatch for input: "+test.rawURL)
}
}
}
Loading
Loading