Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions pkg/metaservice/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "metaservice",
srcs = [
"etcd.go",
"metamanager.go",
],
importpath = "github.com/pingcap/tidb/pkg/metaservice",
visibility = ["//visibility:public"],
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "metaservice_test",
timeout = "short",
srcs = [
"etcd_test.go",
"metamanager_test.go",
],
flaky = True,
shard_count = 5,
Comment thread
ystaticy marked this conversation as resolved.
Outdated
deps = [
":metaservice",
"//pkg/store/mockstore",
"//pkg/testkit",
"@com_github_pingcap_kvproto//pkg/keyspacepb",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_zap//:zap",
],
)
216 changes: 216 additions & 0 deletions pkg/metaservice/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"strings"

"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

const getAllMembersBackoff = 5000

// EtcdMetaServiceClient is used to implement etcd meta service.
type EtcdMetaServiceClient struct {
pdCli pd.Client
KeyspaceEtcdCli *clientv3.Client
Comment thread
ystaticy marked this conversation as resolved.
Outdated
}

// NewEtcdMetaServiceClient is used to implement etcd meta service.
func NewEtcdMetaServiceClient(etcdCli *clientv3.Client, pdCli pd.Client) ServiceClient {
Comment thread
ystaticy marked this conversation as resolved.
Outdated
if etcdCli == nil {
return nil
}
return &EtcdMetaServiceClient{
KeyspaceEtcdCli: etcdCli,
pdCli: pdCli,
}
}

// GetKeyspaceEtcdCli return etcd client.
func (n *EtcdMetaServiceClient) GetKeyspaceEtcdCli() *clientv3.Client {
return n.KeyspaceEtcdCli
}

// GetPDAddrs implements ServiceClient interface.
func (n *EtcdMetaServiceClient) GetPDAddrs() ([]string, error) {
addrs, err := GetPDHostPorts(context.Background(), n.pdCli, false)
if err != nil {
return nil, err
}
return addrs, err
}

// GetPDLeaderAddrs implements ServiceClient interface.
func (n *EtcdMetaServiceClient) GetPDLeaderAddrs(ctx context.Context) (string, zap.Field) {
// todo: PD GetAllMembers should directly return which is the pd leader.
// Don't use etcd client to get PD leader.

var (
leaderAddr string
errMsgMap = map[string]string{}
)
for _, addr := range n.KeyspaceEtcdCli.Endpoints() {
status, err := n.KeyspaceEtcdCli.Status(ctx, addr)
if err != nil {
errMsgMap[addr] = err.Error()
continue
}
if status.Leader == status.Header.MemberId {
leaderAddr = addr
break
}
}

errMsgField := zap.Skip()
if len(errMsgMap) > 0 {
errMsgField = zap.Any("errors when find leader", errMsgMap)
}
return leaderAddr, errMsgField
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

// GetPDHostPorts returns the PD addresses from PD client.
func GetPDHostPorts(ctx context.Context, pdClient pd.Client, hasPrefix bool) ([]string, error) {
Comment thread
ystaticy marked this conversation as resolved.
Outdated
pdAddrs := make([]string, 0)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
if pdClient == nil {
return nil, errors.New("PD client not found")
}
for {
members, err := pdClient.GetAllMembers(ctx)
if err != nil {
err := bo.Backoff(tikv.BoRegionMiss(), err)
if err != nil {
return nil, err
}
continue
}
for _, member := range members.GetMembers() {
Comment thread
D3Hunter marked this conversation as resolved.
if len(member.ClientUrls) > 0 {
prefix, host, port, err := ParseURL(member.ClientUrls[0])
if err != nil {
return nil, fmt.Errorf("parse client url from pd members %q: %w", member.ClientUrls[0], err)
}
var pdAddr string
if hasPrefix {
pdAddr = prefix + host + ":" + port // http://ip:port
} else {
pdAddr = host + ":" + port // ip:port
}

pdAddrs = append(pdAddrs, pdAddr)
}
}
return pdAddrs, nil
Comment thread
ystaticy marked this conversation as resolved.
}
}

// ParseURL parses the given URL to get the host and port.
func ParseURL(rawURL string) (prefix string, host string, port string, err error) {
switch {
case strings.HasPrefix(rawURL, "unix://"):
Comment thread
D3Hunter marked this conversation as resolved.
Outdated
Comment thread
ystaticy marked this conversation as resolved.
Outdated
prefix = "unix://"
case strings.HasPrefix(rawURL, "http://"):
prefix = "http://"
case strings.HasPrefix(rawURL, "https://"):
prefix = "https://"
default:
return "", "", "", fmt.Errorf("invalid URL prefix")
}

if prefix == "unix://" {
host, port, err = parseHostPort(strings.TrimPrefix(rawURL, prefix))
if err != nil {
return "", "", "", fmt.Errorf("invalid URL format, expect host:port")
}
return prefix, host, port, nil
}

u, parseErr := url.Parse(rawURL)
if parseErr != nil || u.Host == "" {
return "", "", "", fmt.Errorf("invalid URL format, expect host:port")
}

host = u.Host
if strings.Contains(u.Host, ":") {
splitHost, splitPort, splitErr := net.SplitHostPort(u.Host)
switch {
case splitErr == nil:
host = splitHost
port = splitPort
case isMissingPortErr(splitErr):
host = u.Hostname()
default:
return "", "", "", fmt.Errorf("invalid URL format, expect host:port")
}
}

if host == "" {
return "", "", "", fmt.Errorf("invalid URL format, expect host:port")
}

if port == "" {
switch u.Scheme {
case "http":
port = "80"
case "https":
port = "443"
default:
return "", "", "", fmt.Errorf("invalid URL format, expect host:port")
}
}
Comment thread
D3Hunter marked this conversation as resolved.
Outdated

if strings.Contains(host, ":") {
host = "[" + host + "]"
}

return prefix, host, port, nil
}

func parseHostPort(rawHostPort string) (host string, port string, err error) {
host, port, err = net.SplitHostPort(rawHostPort)
if err != nil {
return "", "", err
}
if strings.Contains(host, ":") {
host = "[" + host + "]"
}
Comment thread
ystaticy marked this conversation as resolved.
Outdated
if host == "" || port == "" {
return "", "", errors.New("invalid host or port")
}
return host, port, nil
}

func isMissingPortErr(err error) bool {
var addrErr *net.AddrError
return errors.As(err, &addrErr) && addrErr.Err == "missing port in address"
}

// GetPDHttpAddrs is used to get PD http addrs.
func (n *EtcdMetaServiceClient) GetPDHttpAddrs() ([]string, error) {
addrs, err := GetPDHostPorts(context.Background(), n.pdCli, true)
if err != nil {
return nil, err
}
return addrs, err
}
141 changes: 141 additions & 0 deletions pkg/metaservice/etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metaservice_test

import (
"context"
"net"
"testing"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/tidb/pkg/metaservice"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/tests/v3/integration"
"go.uber.org/zap"
)

type mockPDClient struct {
pd.Client
members []*pdpb.Member
}

func (c *mockPDClient) GetAllMembers(context.Context) (*pdpb.GetMembersResponse, error) {
return &pdpb.GetMembersResponse{Members: c.members}, nil
}

// ETCD use ip:port as unix socket address, however this address is invalid on windows.
// We have to skip some of the test in such case.
// https://github.com/etcd-io/etcd/blob/f0faa5501d936cd8c9f561bb9d1baca70eb67ab1/pkg/types/urls.go#L42
func unixSocketAvailable() bool {
c, err := net.Listen("unix", "127.0.0.1:0")
if err == nil {
_ = c.Close()
return true
}
return false
}

// TestGetPDAddrsWithRealClient tests the GetPDAddrs method with a real etcd client
func TestGetPDAddrsWithRealClient(t *testing.T) {
integration.BeforeTestExternal(t)
if !unixSocketAvailable() {
Comment thread
ystaticy marked this conversation as resolved.
Outdated
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
}

// Initialize etcd client
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Comment thread
D3Hunter marked this conversation as resolved.
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

expectAddrs := []string{"127.0.0.1:1111"}
pdCli := &mockPDClient{
members: []*pdpb.Member{{
ClientUrls: []string{"http://127.0.0.1:1111"},
}},
}

serviceClient := metaservice.NewEtcdMetaServiceClient(etcdCli, pdCli)
addrs, err := serviceClient.GetPDAddrs()
require.NoError(t, err)
require.Equal(t, expectAddrs, addrs)
}

// TestGetPDLeaderAddrsWithRealClient tests the GetPDLeaderAddrs method with a real etcd client
func TestGetPDLeaderAddrsWithRealClient(t *testing.T) {
integration.BeforeTestExternal(t)
if !unixSocketAvailable() {
t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.")
}

// Initialize etcd client
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

serviceClient := metaservice.NewEtcdMetaServiceClient(etcdCli, nil)
ctx := context.Background()

leaderAddr, errMsgField := serviceClient.GetPDLeaderAddrs(ctx)

require.NotEmpty(t, leaderAddr, "Leader address should not be empty")
require.Equal(t, zap.Skip(), errMsgField) // No errors should be present
}

// TestParseURL tests the ParseURL function with various inputs.
func TestParseURL(t *testing.T) {
tests := []struct {
rawURL string
prefix string
host string
port string
err bool
}{
// Successful test cases
{"unix://localhost:m0", "unix://", "localhost", "m0", false},
{"http://example.com:8080", "http://", "example.com", "8080", false},
{"https://example.com", "https://", "example.com", "443", false}, // Default port for HTTPS
{"http://localhost", "http://", "localhost", "80", false}, // Default port for HTTP
{"https://localhost:443", "https://", "localhost", "443", false}, // Specified port for HTTPS
{"http://[2001:db8::1]:2379", "http://", "[2001:db8::1]", "2379", false},
{"https://[2001:db8::1]", "https://", "[2001:db8::1]", "443", false},

// Unsuccessful test cases
{"ftp://example.com", "ftp://", "", "", true}, // Invalid prefix
{"unix://localhost", "unix://", "localhost", "", true}, // Missing port
{"http://example.com:8080:extra", "http://", "", "", true}, // Extra part after port
{"https://:8080", "https://", "", "", true}, // Missing host
{"http://", "http://", "", "", true}, // Incomplete URL
{"http://2001:db8::1:2379", "http://", "", "", true}, // Unbracketed IPv6 with port
{"https://[2001:db8::1", "https://", "", "", true}, // Invalid bracketed IPv6
}

for _, test := range tests {
prefix, host, port, err := metaservice.ParseURL(test.rawURL)

// Check if the error status matches the expectation
if test.err {
require.Error(t, err, "Expected an error for input: "+test.rawURL)
require.Empty(t, prefix, "Expected an error for input: "+test.rawURL)
require.Empty(t, host, "Host should be empty for input: "+test.rawURL)
require.Empty(t, port, "Port should be empty for input: "+test.rawURL)
} else {
require.NoError(t, err, "Did not expect an error for input: "+test.rawURL)
require.Equal(t, test.prefix, prefix, "prefix mismatch for input: "+test.rawURL)
require.Equal(t, test.host, host, "Host mismatch for input: "+test.rawURL)
require.Equal(t, test.port, port, "Port mismatch for input: "+test.rawURL)
}
}
}
Loading
Loading