Skip to content
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
2f33c99
introduce meta service grou
ystaticy May 31, 2026
5c0a12f
bazel prepare
ystaticy Jun 1, 2026
566848f
fix comment
ystaticy Jun 1, 2026
d2b1e76
fix comments
ystaticy Jun 1, 2026
aadbf10
fix comments
ystaticy Jun 1, 2026
986c04d
fix License
ystaticy Jun 1, 2026
e78f709
fix comments
ystaticy Jun 1, 2026
79f521e
fix comments
ystaticy Jun 1, 2026
abeddeb
fix get pd leader
ystaticy Jun 1, 2026
a0883d0
remove get pd leader
ystaticy Jun 1, 2026
16cd477
Update pkg/metaservice/etcd.go
ystaticy Jun 1, 2026
6921c0c
remove get pd leader
ystaticy Jun 1, 2026
0b46469
build_prepare
ystaticy Jun 1, 2026
e59c24b
build_prepare
ystaticy Jun 1, 2026
0058d5d
fix
ystaticy Jun 2, 2026
ece2d4d
fix comments
ystaticy Jun 2, 2026
5fc90a5
fix comments
ystaticy Jun 2, 2026
67d46d2
add ctx
ystaticy Jun 2, 2026
b67212f
fix comments
ystaticy Jun 2, 2026
1d677da
fix comments
ystaticy Jun 2, 2026
c19d2de
fix comments
ystaticy Jun 2, 2026
55e9fdb
fix comments
ystaticy Jun 2, 2026
f9cf01a
fix comments
ystaticy Jun 2, 2026
627e5c4
fix comments
ystaticy Jun 2, 2026
28f07ea
fix comments
ystaticy Jun 2, 2026
8984264
fix comments
ystaticy Jun 2, 2026
1ee79ac
fix comments
ystaticy Jun 2, 2026
e181dec
fix comments
ystaticy Jun 2, 2026
0bd1a2e
fix comments
ystaticy Jun 2, 2026
e770265
fix comments
ystaticy Jun 2, 2026
e34d9b7
fix comments
ystaticy Jun 2, 2026
5cb8ed4
fix comments
ystaticy Jun 2, 2026
07777fa
Update pkg/metaservice/metamanager.go
ystaticy Jun 2, 2026
d2184b1
fix comments
ystaticy Jun 3, 2026
d0402c1
fix name
ystaticy Jun 4, 2026
9074b6c
fix code comments
ystaticy Jun 4, 2026
0dbfd97
update func name
ystaticy Jun 4, 2026
b26858e
update func name
ystaticy Jun 4, 2026
a77a057
rename
ystaticy Jun 4, 2026
486e2ad
add validateGroupID
ystaticy Jun 5, 2026
657fdde
fix comments
ystaticy Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/metaservice/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "metaservice",
srcs = [
"etcd.go",
"metamanager.go",
],
importpath = "github.com/pingcap/tidb/pkg/metaservice",
visibility = ["//visibility:public"],
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "metaservice_test",
timeout = "short",
srcs = [
"etcd_test.go",
"metamanager_test.go",
],
embed = [":metaservice"],
flaky = True,
shard_count = 4,
deps = [
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_tests_v3//integration",
],
)
150 changes: 150 additions & 0 deletions pkg/metaservice/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice

import (
"context"
"errors"
"fmt"
"net"
"net/url"

"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
)

const getAllMembersBackoff = 5000
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming it as getAllMembersBackoffMs to indicate the back unit?


// client is used to implement etcd meta service.
type client struct {
pdCli pd.Client
keyspaceEtcdCli *clientv3.Client
}

// newClient is used to implement etcd meta service.
func newClient(etcdCli *clientv3.Client, pdCli pd.Client) ServiceClient {
if etcdCli == nil {
return nil
}
return &client{
keyspaceEtcdCli: etcdCli,
pdCli: pdCli,
}
}

// GetKeyspaceEtcdCli return etcd client.
func (n *client) GetKeyspaceEtcdCli() *clientv3.Client {
return n.keyspaceEtcdCli
}

// GetPDAddrs implements ServiceClient interface.
func (n *client) GetPDAddrs(ctx context.Context) ([]string, error) {
addrs, err := GetPDHostPorts(ctx, n.pdCli, false)
if err != nil {
return nil, err
}
return addrs, err
}

// GetPDHostPorts returns the PD addresses from PD client.
func GetPDHostPorts(ctx context.Context, pdClient pd.Client, withSchema bool) ([]string, error) {
Comment thread
ystaticy marked this conversation as resolved.
Outdated
pdAddrs := make([]string, 0)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
if pdClient == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check should place into the first line

return nil, errors.New("PD client not found")
}
for {
members, err := pdClient.GetAllMembers(ctx)
if err != nil {
err := bo.Backoff(tikv.BoRegionMiss(), err)
if err != nil {
return nil, err
}
continue
}
for _, member := range members.GetMembers() {
Comment thread
D3Hunter marked this conversation as resolved.
if len(member.ClientUrls) > 0 {
prefix, host, port, err := ParseURL(member.ClientUrls[0])
if err != nil {
return nil, fmt.Errorf("parse client url from pd members %q: %w", member.ClientUrls[0], err)
}
hostPort := net.JoinHostPort(host, port)
var pdAddr string
if withSchema {
pdAddr = prefix + hostPort // http://ip:port
} else {
pdAddr = hostPort // ip:port
}

pdAddrs = append(pdAddrs, pdAddr)
}
}
if len(pdAddrs) == 0 {
return nil, errors.New("no usable PD client URL found in PD members")
}
return pdAddrs, nil
Comment thread
ystaticy marked this conversation as resolved.
}
}

// ParseURL parses the given URL to get the host and port.
func ParseURL(rawURL string) (prefix string, host string, port string, err error) {
u, parseErr := url.Parse(rawURL)
if parseErr != nil {
return "", "", "", invalidURLHostPortErr()
}

switch u.Scheme {
case "http":
prefix = "http://"
case "https":
prefix = "https://"
default:
return "", "", "", fmt.Errorf("invalid URL prefix")
}

host, port, err = parseHostPort(u.Host)
Comment thread
ystaticy marked this conversation as resolved.
Outdated
if err != nil {
return "", "", "", invalidURLHostPortErr()
}

return prefix, host, port, nil
}

func parseHostPort(rawHostPort string) (host string, port string, err error) {
host, port, err = net.SplitHostPort(rawHostPort)
if err != nil {
return "", "", err
}

if host == "" || port == "" {
return "", "", errors.New("invalid host or port")
}

return host, port, nil
}

func invalidURLHostPortErr() error {
return fmt.Errorf("invalid URL format, expect host:port")
}

// GetPDHttpAddrs is used to get PD http addrs.
func (n *client) GetPDHttpAddrs(ctx context.Context) ([]string, error) {
addrs, err := GetPDHostPorts(ctx, n.pdCli, true)
if err != nil {
return nil, err
}
return addrs, err
}
135 changes: 135 additions & 0 deletions pkg/metaservice/etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice

import (
"context"
"net"
"runtime"
"testing"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/tests/v3/integration"
)

type mockPDClient struct {
pd.Client
members []*pdpb.Member
}

func (c *mockPDClient) GetAllMembers(context.Context) (*pdpb.GetMembersResponse, error) {
return &pdpb.GetMembersResponse{Members: c.members}, nil
}

// ETCD use ip:port as unix socket address, however this address is invalid on windows.
// We have to skip some of the test in such case.
// https://github.com/etcd-io/etcd/blob/f0faa5501d936cd8c9f561bb9d1baca70eb67ab1/pkg/types/urls.go#L42
func unixSocketAvailable() bool {
c, err := net.Listen("unix", "127.0.0.1:0")
if err == nil {
_ = c.Close()
return true
}
return false
}

// TestGetPDAddrsWithRealClient tests the GetPDAddrs method with a real etcd client
func TestGetPDAddrsWithRealClient(t *testing.T) {
integration.BeforeTestExternal(t)
if runtime.GOOS == "windows" {
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
}

// Initialize etcd client
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Comment thread
D3Hunter marked this conversation as resolved.
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

expectAddrs := []string{"127.0.0.1:1111"}
pdCli := &mockPDClient{
members: []*pdpb.Member{{
ClientUrls: []string{"http://127.0.0.1:1111"},
}},
}

serviceClient := newClient(etcdCli, pdCli)
addrs, err := serviceClient.GetPDAddrs(context.Background())
require.NoError(t, err)
require.Equal(t, expectAddrs, addrs)

t.Run("empty client urls returns error", func(t *testing.T) {
pdCli := &mockPDClient{
members: []*pdpb.Member{
{},
{ClientUrls: []string{}},
},
}

serviceClient := newClient(etcdCli, pdCli)
addrs, err := serviceClient.GetPDAddrs(context.Background())
require.Error(t, err)
require.Nil(t, addrs)
require.EqualError(t, err, "no usable PD client URL found in PD members")
})
}

// TestParseURL tests the ParseURL function with various inputs.
func TestParseURL(t *testing.T) {
tests := []struct {
rawURL string
prefix string
host string
port string
err bool
}{
// Successful test cases
{"http://example.com:8080", "http://", "example.com", "8080", false},
{"https://localhost:443", "https://", "localhost", "443", false}, // Specified port for HTTPS
{"http://[2001:db8::1]:2379", "http://", "2001:db8::1", "2379", false},
{"https://[2001:db8::1]:443", "https://", "2001:db8::1", "443", false},

// Unsuccessful test cases
{"ftp://example.com", "ftp://", "", "", true}, // Invalid prefix
{"unix://localhost:m0", "unix://", "", "", true}, // Unix schema is unsupported
{"unix://localhost", "unix://", "", "", true}, // Unix schema is unsupported
{"http://example.com:8080:extra", "http://", "", "", true}, // Extra part after port
{"https://:8080", "https://", "", "", true}, // Missing host
{"http://", "http://", "", "", true}, // Incomplete URL
{"https://example.com", "https://", "", "", true}, // Missing port
{"http://localhost", "http://", "", "", true}, // Missing port
{"https://[2001:db8::1]", "https://", "", "", true}, // Missing port
{"http://2001:db8::1:2379", "http://", "", "", true}, // Unbracketed IPv6 with port
{"https://[2001:db8::1", "https://", "", "", true}, // Invalid bracketed IPv6
}

for _, test := range tests {
prefix, host, port, err := ParseURL(test.rawURL)

// Check if the error status matches the expectation
if test.err {
require.Error(t, err, "Expected an error for input: "+test.rawURL)
require.Empty(t, prefix, "Expected an error for input: "+test.rawURL)
require.Empty(t, host, "Host should be empty for input: "+test.rawURL)
require.Empty(t, port, "Port should be empty for input: "+test.rawURL)
} else {
require.NoError(t, err, "Did not expect an error for input: "+test.rawURL)
require.Equal(t, test.prefix, prefix, "prefix mismatch for input: "+test.rawURL)
require.Equal(t, test.host, host, "Host mismatch for input: "+test.rawURL)
require.Equal(t, test.port, port, "Port mismatch for input: "+test.rawURL)
}
}
}
Loading
Loading