diff --git a/pkg/metaservice/BUILD.bazel b/pkg/metaservice/BUILD.bazel new file mode 100644 index 0000000000000..16c41059c530a --- /dev/null +++ b/pkg/metaservice/BUILD.bazel @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "metaservice", + srcs = [ + "etcd.go", + "metamanager.go", + ], + importpath = "github.com/pingcap/tidb/pkg/metaservice", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/keyspacepb", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "metaservice_test", + timeout = "short", + srcs = [ + "etcd_test.go", + "metamanager_test.go", + ], + embed = [":metaservice"], + flaky = True, + shard_count = 4, + deps = [ + "@com_github_pingcap_kvproto//pkg/keyspacepb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_tests_v3//integration", + ], +) diff --git a/pkg/metaservice/etcd.go b/pkg/metaservice/etcd.go new file mode 100644 index 0000000000000..11164171fce3d --- /dev/null +++ b/pkg/metaservice/etcd.go @@ -0,0 +1,136 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metaservice + +import ( + "context" + "errors" + "fmt" + "net" + "net/url" + + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const getAllMembersBackoff = 5000 + +// client is used to implement etcd meta service. +type client struct { + pdCli pd.Client + keyspaceEtcdCli *clientv3.Client +} + +// newClient is used to implement etcd meta service. +func newClient(etcdCli *clientv3.Client, pdCli pd.Client) ServiceClient { + if etcdCli == nil { + return nil + } + return &client{ + keyspaceEtcdCli: etcdCli, + pdCli: pdCli, + } +} + +// GetKeyspaceEtcdCli return etcd client. +func (n *client) GetKeyspaceEtcdCli() *clientv3.Client { + return n.keyspaceEtcdCli +} + +// GetPDAddrs implements ServiceClient interface. +func (n *client) GetPDAddrs(ctx context.Context) ([]string, error) { + addrs, err := GetPDAddrs(ctx, n.pdCli, false) + if err != nil { + return nil, err + } + return addrs, err +} + +// GetPDAddrs returns the PD addresses from PD client. +func GetPDAddrs(ctx context.Context, pdClient pd.Client, withSchema bool) ([]string, error) { + pdAddrs := make([]string, 0) + bo := tikv.NewBackoffer(ctx, getAllMembersBackoff) + if pdClient == nil { + return nil, errors.New("PD client not found") + } + for { + members, err := pdClient.GetAllMembers(ctx) + if err != nil { + err := bo.Backoff(tikv.BoRegionMiss(), err) + if err != nil { + return nil, err + } + continue + } + for _, member := range members.GetMembers() { + if len(member.ClientUrls) > 0 { + prefix, hostPort, err := ParseURL(member.ClientUrls[0]) + if err != nil { + return nil, fmt.Errorf("parse client url from pd members %q: %w", member.ClientUrls[0], err) + } + var pdAddr string + if withSchema { + pdAddr = prefix + hostPort // http://ip:port + } else { + pdAddr = hostPort // ip:port + } + + pdAddrs = append(pdAddrs, pdAddr) + } + } + if len(pdAddrs) == 0 { + return nil, errors.New("no usable PD client URL found in PD members") + } + return pdAddrs, nil + } +} + +// ParseURL parses the given URL to get the host:port. +func ParseURL(rawURL string) (prefix string, hostPort string, err error) { + u, parseErr := url.Parse(rawURL) + if parseErr != nil { + return "", "", invalidURLHostPortErr() + } + + switch u.Scheme { + case "http": + prefix = "http://" + case "https": + prefix = "https://" + default: + return "", "", fmt.Errorf("invalid URL prefix") + } + + host, port, splitErr := net.SplitHostPort(u.Host) + if splitErr != nil || host == "" || port == "" { + return "", "", invalidURLHostPortErr() + } + + return prefix, u.Host, nil +} + +func invalidURLHostPortErr() error { + return fmt.Errorf("invalid URL format, expect host:port") +} + +// GetPDHttpAddrs is used to get PD http addrs. +func (n *client) GetPDHttpAddrs(ctx context.Context) ([]string, error) { + addrs, err := GetPDAddrs(ctx, n.pdCli, true) + if err != nil { + return nil, err + } + return addrs, err +} diff --git a/pkg/metaservice/etcd_test.go b/pkg/metaservice/etcd_test.go new file mode 100644 index 0000000000000..314ecca24a5e2 --- /dev/null +++ b/pkg/metaservice/etcd_test.go @@ -0,0 +1,132 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metaservice + +import ( + "context" + "net" + "runtime" + "testing" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" + "go.etcd.io/etcd/tests/v3/integration" +) + +type mockPDClient struct { + pd.Client + members []*pdpb.Member +} + +func (c *mockPDClient) GetAllMembers(context.Context) (*pdpb.GetMembersResponse, error) { + return &pdpb.GetMembersResponse{Members: c.members}, nil +} + +// ETCD use ip:port as unix socket address, however this address is invalid on windows. +// We have to skip some of the test in such case. +// https://github.com/etcd-io/etcd/blob/f0faa5501d936cd8c9f561bb9d1baca70eb67ab1/pkg/types/urls.go#L42 +func unixSocketAvailable() bool { + c, err := net.Listen("unix", "127.0.0.1:0") + if err == nil { + _ = c.Close() + return true + } + return false +} + +// TestGetPDAddrsWithRealClient tests the GetPDAddrs method with a real etcd client +func TestGetPDAddrsWithRealClient(t *testing.T) { + integration.BeforeTestExternal(t) + if runtime.GOOS == "windows" { + t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.") + } + + // Initialize etcd client + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + etcdCli := cluster.RandClient() + + expectAddrs := []string{"127.0.0.1:1111"} + pdCli := &mockPDClient{ + members: []*pdpb.Member{{ + ClientUrls: []string{"http://127.0.0.1:1111"}, + }}, + } + + serviceClient := newClient(etcdCli, pdCli) + addrs, err := serviceClient.GetPDAddrs(context.Background()) + require.NoError(t, err) + require.Equal(t, expectAddrs, addrs) + + t.Run("empty client urls returns error", func(t *testing.T) { + pdCli := &mockPDClient{ + members: []*pdpb.Member{ + {}, + {ClientUrls: []string{}}, + }, + } + + serviceClient := newClient(etcdCli, pdCli) + addrs, err := serviceClient.GetPDAddrs(context.Background()) + require.Error(t, err) + require.Nil(t, addrs) + require.EqualError(t, err, "no usable PD client URL found in PD members") + }) +} + +// TestParseURL tests the ParseURL function with various inputs. +func TestParseURL(t *testing.T) { + tests := []struct { + rawURL string + prefix string + hostPort string + err bool + }{ + // Successful test cases + {"http://example.com:8080", "http://", "example.com:8080", false}, + {"https://localhost:443", "https://", "localhost:443", false}, + {"http://[2001:db8::1]:2379", "http://", "[2001:db8::1]:2379", false}, + {"https://[2001:db8::1]:443", "https://", "[2001:db8::1]:443", false}, + + // Unsuccessful test cases + {"ftp://example.com", "ftp://", "", true}, // Invalid prefix + {"unix://localhost:m0", "unix://", "", true}, // Unix schema is unsupported + {"unix://localhost", "unix://", "", true}, // Unix schema is unsupported + {"http://example.com:8080:extra", "http://", "", true}, // Extra part after port + {"https://:8080", "https://", "", true}, // Missing host + {"http://", "http://", "", true}, // Incomplete URL + {"https://example.com", "https://", "", true}, // Missing port + {"http://localhost", "http://", "", true}, // Missing port + {"https://[2001:db8::1]", "https://", "", true}, // Missing port + {"http://2001:db8::1:2379", "http://", "", true}, // Unbracketed IPv6 with port + {"https://[2001:db8::1", "https://", "", true}, // Invalid bracketed IPv6 + } + + for _, test := range tests { + prefix, hostPort, err := ParseURL(test.rawURL) + + // Check if the error status matches the expectation + if test.err { + require.Error(t, err, "Expected an error for input: "+test.rawURL) + require.Empty(t, prefix, "Expected an error for input: "+test.rawURL) + require.Empty(t, hostPort, "hostPort should be empty for input: "+test.rawURL) + } else { + require.NoError(t, err, "Did not expect an error for input: "+test.rawURL) + require.Equal(t, test.prefix, prefix, "prefix mismatch for input: "+test.rawURL) + require.Equal(t, test.hostPort, hostPort, "hostPort mismatch for input: "+test.rawURL) + } + } +} diff --git a/pkg/metaservice/metamanager.go b/pkg/metaservice/metamanager.go new file mode 100644 index 0000000000000..e72a32a30fe40 --- /dev/null +++ b/pkg/metaservice/metamanager.go @@ -0,0 +1,136 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metaservice + +import ( + "context" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +const ( + // GlobalGroupID is global meta service group id, + // it stores some global rules and label, region and other information. + GlobalGroupID = "0" + // MetaServiceGroupIDKey is a keyspace meta config key name, + // the value of this key is meta service group id for this keyspace. + GroupIDKey = "meta_service_group_id" + // MetaGroupAddrsKey is a keyspace meta config key name, + // the value of this key is meta service group addrs for this key. + GroupAddrsKey = "meta_service_group_addrs" +) + +// ErrGroupNotMatch exported for test. +var ErrGroupNotMatch = errors.New("it is unexpected for the keyspace to have a group ID but no group addresses") + +// ErrNilKeyspaceMeta indicates the caller passed a nil keyspace meta to GetKeyspaceMetaServiceGroup. +var ErrNilKeyspaceMeta = errors.New("GetKeyspaceMetaServiceGroup: keyspace meta is nil") + +// Info includes the global meta service address and the TiDB meta service group info. +type Info struct { + PDAddrs []string + GlobalMetaServiceAddrs []string + Group *Group +} + +// Group includes keyspace meta service group info. +type Group struct { + GroupID string + Addrs []string +} + +// GetKeyspaceMetaServiceGroup return keyspace meta service group. +func GetKeyspaceMetaServiceGroup(keyspaceMeta *keyspacepb.KeyspaceMeta, globalMetaAddrs []string) (*Group, error) { + if keyspaceMeta == nil { + return nil, ErrNilKeyspaceMeta + } + var group *Group + // TODO: Refactor meta service group storage format by moving it from config to dedicated fields in keyspace meta. + if val, ok := keyspaceMeta.Config[MetaServiceGroupIDKey]; ok { + groupID := val + addrsStr, addrsOk := keyspaceMeta.Config[MetaGroupAddrsKey] + if !addrsOk { + return nil, ErrGroupNotMatch + } + rawAddrs := strings.Split(strings.TrimSpace(addrsStr), ",") + addrs := make([]string, 0, len(rawAddrs)) + for _, addr := range rawAddrs { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + addrs = append(addrs, addr) + } + if len(addrs) == 0 { + return nil, ErrGroupNotMatch + } + group = &Group{ + GroupID: groupID, + Addrs: addrs, + } + log.Info("get keyspace meta service group info", zap.Any("group-info", group)) + return group, nil + } + + // If keyspace don't have KeyspaceMetaGroupIDKey, then set keyspace meta service as global meta service. + group = &Group{ + GroupID: GlobalGroupID, + Addrs: globalMetaAddrs, + } + log.Info("get default keyspace meta service group info ", zap.Any("group-info", group)) + return group, nil +} + +// GetMetaServiceInfo return meta service info. +func GetMetaServiceInfo(keyspaceMeta *keyspacepb.KeyspaceMeta, globalMetaAddrs []string, pdAddrs []string) (*Info, error) { + // If non-keyspace then return global meta service or not enable meta service group. + if keyspaceMeta == nil { + keyspaceMetaServiceGroup := &Group{ + GroupID: GlobalGroupID, + Addrs: globalMetaAddrs, + } + metaInfo := &Info{ + PDAddrs: pdAddrs, + GlobalMetaServiceAddrs: globalMetaAddrs, + Group: keyspaceMetaServiceGroup, + } + log.Info("return meta service group info", zap.Any("meta-service-info", metaInfo)) + return metaInfo, nil + } + + keyspaceServiceGroup, err := GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + if err != nil { + return nil, err + } + metaInfo := &Info{ + PDAddrs: pdAddrs, + GlobalMetaServiceAddrs: globalMetaAddrs, + Group: keyspaceServiceGroup, + } + log.Info("return keyspace meta service group info", zap.Any("meta-service-info", metaInfo)) + return metaInfo, nil +} + +// ServiceClient is used to request meta service. +type ServiceClient interface { + // GetPDAddrs is used to get pd addrs(host:port). + GetPDAddrs(ctx context.Context) ([]string, error) + // GetPDHttpAddrs is used to get PD http addrs. + GetPDHttpAddrs(ctx context.Context) ([]string, error) +} diff --git a/pkg/metaservice/metamanager_test.go b/pkg/metaservice/metamanager_test.go new file mode 100644 index 0000000000000..6ecd92d69c856 --- /dev/null +++ b/pkg/metaservice/metamanager_test.go @@ -0,0 +1,110 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metaservice_test + +import ( + "errors" + "strings" + "testing" + + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/tidb/pkg/metaservice" + "github.com/stretchr/testify/require" +) + +// TestGetKeyspaceMetaServiceGroup tests the GetKeyspaceMetaServiceGroup function. +func TestGetKeyspaceMetaServiceGroup(t *testing.T) { + globalMetaAddrs := []string{"127.0.0.1:2379"} + expectedAddrsStr := "127.0.0.1:2388,127.0.0.1:2389" + expectedAddrs := strings.Split(expectedAddrsStr, ",") + + // Test case where keyspaceMeta is nil + keyspaceMetaServiceGroup, err := metaservice.GetKeyspaceMetaServiceGroup(nil, globalMetaAddrs) + require.Nil(t, keyspaceMetaServiceGroup) + require.Error(t, err) + require.True(t, errors.Is(err, metaservice.ErrNilKeyspaceMeta)) + + // Test case with a valid group ID and addresses + keyspaceMeta := &keyspacepb.KeyspaceMeta{ + Config: map[string]string{ + metaservice.MetaServiceGroupIDKey: "1", // Valid numeric string + metaservice.MetaGroupAddrsKey: expectedAddrsStr, + }, + } + + keyspaceMetaServiceGroup, err = metaservice.GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + require.NoError(t, err) + require.Equal(t, "1", keyspaceMetaServiceGroup.GroupID) + + require.ElementsMatch(t, expectedAddrs, keyspaceMetaServiceGroup.Addrs) + + // Test case with blank entries in addresses + keyspaceMeta.Config[metaservice.MetaGroupAddrsKey] = " 127.0.0.1:2388, ,127.0.0.1:2389, " + keyspaceMetaServiceGroup, err = metaservice.GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + require.NoError(t, err) + require.Equal(t, "1", keyspaceMetaServiceGroup.GroupID) + require.ElementsMatch(t, expectedAddrs, keyspaceMetaServiceGroup.Addrs) + + // Test case where all addresses are blank + keyspaceMeta.Config[metaservice.MetaGroupAddrsKey] = " , \t, " + keyspaceMetaServiceGroup, err = metaservice.GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + require.Nil(t, keyspaceMetaServiceGroup) + require.Error(t, err) + require.True(t, errors.Is(err, metaservice.ErrGroupNotMatch)) + + // Test case where the group ID exists but addresses do not + delete(keyspaceMeta.Config, metaservice.MetaGroupAddrsKey) + keyspaceMetaServiceGroup, err = metaservice.GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + require.Error(t, err) + require.True(t, errors.Is(err, metaservice.ErrGroupNotMatch)) + + // Test case where the group ID does not exist + delete(keyspaceMeta.Config, metaservice.MetaServiceGroupIDKey) + keyspaceMetaServiceGroup, err = metaservice.GetKeyspaceMetaServiceGroup(keyspaceMeta, globalMetaAddrs) + require.NoError(t, err) + require.Equal(t, metaservice.GlobalGroupID, keyspaceMetaServiceGroup.GroupID) + require.ElementsMatch(t, globalMetaAddrs, keyspaceMetaServiceGroup.Addrs) +} + +// TestGetMetaServiceInfo tests the GetMetaServiceInfo function. +func TestGetMetaServiceInfo(t *testing.T) { + expectPDAddrs := []string{"127.0.0.1:2380"} + globalMetaAddrs := []string{"127.0.0.1:2379"} + + // Test case where keyspaceMeta is nil + metaInfo, err := metaservice.GetMetaServiceInfo(nil, globalMetaAddrs, expectPDAddrs) + require.NoError(t, err) + require.NotNil(t, metaInfo) + require.Equal(t, globalMetaAddrs[0], metaInfo.GlobalMetaServiceAddrs[0]) + require.Equal(t, "0", metaInfo.Group.GroupID) + require.Equal(t, expectPDAddrs, metaInfo.PDAddrs) + + // Test case with a valid keyspaceMeta + keyspaceMeta := &keyspacepb.KeyspaceMeta{ + Config: map[string]string{ + metaservice.MetaServiceGroupIDKey: "2", // Valid numeric string + metaservice.MetaGroupAddrsKey: "127.0.0.1:2388,127.0.0.1:2389", + }, + } + + metaInfo, err = metaservice.GetMetaServiceInfo(keyspaceMeta, globalMetaAddrs, expectPDAddrs) + require.NoError(t, err) + require.NotNil(t, metaInfo) + require.Equal(t, globalMetaAddrs[0], metaInfo.GlobalMetaServiceAddrs[0]) + require.Equal(t, "2", metaInfo.Group.GroupID) + require.Equal(t, expectPDAddrs, metaInfo.PDAddrs) + expectedAddrs := []string{"127.0.0.1:2388", "127.0.0.1:2389"} + require.ElementsMatch(t, expectedAddrs, metaInfo.Group.Addrs) +}