-
Notifications
You must be signed in to change notification settings - Fork 6.2k
*: introduce meta service group #68818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 32 commits
2f33c99
5c0a12f
566848f
d2b1e76
aadbf10
986c04d
e78f709
79f521e
abeddeb
a0883d0
16cd477
6921c0c
0b46469
e59c24b
0058d5d
ece2d4d
5fc90a5
67d46d2
b67212f
1d677da
c19d2de
55e9fdb
f9cf01a
627e5c4
28f07ea
8984264
1ee79ac
e181dec
0bd1a2e
e770265
e34d9b7
5cb8ed4
07777fa
d2184b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
|
||
| go_library( | ||
| name = "metaservice", | ||
| srcs = [ | ||
| "etcd.go", | ||
| "metamanager.go", | ||
| ], | ||
| importpath = "github.com/pingcap/tidb/pkg/metaservice", | ||
| visibility = ["//visibility:public"], | ||
| deps = [ | ||
| "@com_github_pingcap_errors//:errors", | ||
| "@com_github_pingcap_kvproto//pkg/keyspacepb", | ||
| "@com_github_pingcap_log//:log", | ||
| "@com_github_tikv_client_go_v2//tikv", | ||
| "@com_github_tikv_pd_client//:client", | ||
| "@io_etcd_go_etcd_client_v3//:client", | ||
| "@org_uber_go_zap//:zap", | ||
| ], | ||
| ) | ||
|
|
||
| go_test( | ||
| name = "metaservice_test", | ||
| timeout = "short", | ||
| srcs = [ | ||
| "etcd_test.go", | ||
| "metamanager_test.go", | ||
| ], | ||
| embed = [":metaservice"], | ||
| flaky = True, | ||
| shard_count = 4, | ||
| deps = [ | ||
| "@com_github_pingcap_kvproto//pkg/keyspacepb", | ||
| "@com_github_pingcap_kvproto//pkg/pdpb", | ||
| "@com_github_stretchr_testify//require", | ||
| "@com_github_tikv_pd_client//:client", | ||
| "@io_etcd_go_etcd_tests_v3//integration", | ||
| ], | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| // Copyright 2026 PingCAP, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package metaservice | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "net" | ||
| "net/url" | ||
|
|
||
| "github.com/tikv/client-go/v2/tikv" | ||
| pd "github.com/tikv/pd/client" | ||
| clientv3 "go.etcd.io/etcd/client/v3" | ||
| ) | ||
|
|
||
| const getAllMembersBackoff = 5000 | ||
|
|
||
| // client is used to implement etcd meta service. | ||
| type client struct { | ||
| pdCli pd.Client | ||
| keyspaceEtcdCli *clientv3.Client | ||
| } | ||
|
|
||
| // newClient is used to implement etcd meta service. | ||
| func newClient(etcdCli *clientv3.Client, pdCli pd.Client) ServiceClient { | ||
| if etcdCli == nil { | ||
| return nil | ||
| } | ||
| return &client{ | ||
| keyspaceEtcdCli: etcdCli, | ||
| pdCli: pdCli, | ||
| } | ||
| } | ||
|
|
||
| // GetKeyspaceEtcdCli return etcd client. | ||
| func (n *client) GetKeyspaceEtcdCli() *clientv3.Client { | ||
| return n.keyspaceEtcdCli | ||
| } | ||
|
|
||
| // GetPDAddrs implements ServiceClient interface. | ||
| func (n *client) GetPDAddrs(ctx context.Context) ([]string, error) { | ||
| addrs, err := GetPDAddrs(ctx, n.pdCli, false) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return addrs, err | ||
| } | ||
|
|
||
| // GetPDAddrs returns the PD addresses from PD client. | ||
| func GetPDAddrs(ctx context.Context, pdClient pd.Client, withSchema bool) ([]string, error) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 [Minor] Address helper name hides whether it returns host-ports or URLsWhy Scope Risk if unchanged Evidence Change request |
||
| pdAddrs := make([]string, 0) | ||
| bo := tikv.NewBackoffer(ctx, getAllMembersBackoff) | ||
| if pdClient == nil { | ||
| return nil, errors.New("PD client not found") | ||
| } | ||
| for { | ||
| members, err := pdClient.GetAllMembers(ctx) | ||
| if err != nil { | ||
| err := bo.Backoff(tikv.BoRegionMiss(), err) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| continue | ||
| } | ||
| for _, member := range members.GetMembers() { | ||
|
D3Hunter marked this conversation as resolved.
|
||
| if len(member.ClientUrls) > 0 { | ||
| prefix, host, port, err := ParseURL(member.ClientUrls[0]) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse client url from pd members %q: %w", member.ClientUrls[0], err) | ||
| } | ||
| hostPort := net.JoinHostPort(host, port) | ||
| var pdAddr string | ||
| if withSchema { | ||
| pdAddr = prefix + hostPort // http://ip:port | ||
| } else { | ||
| pdAddr = hostPort // ip:port | ||
| } | ||
|
|
||
| pdAddrs = append(pdAddrs, pdAddr) | ||
| } | ||
| } | ||
| if len(pdAddrs) == 0 { | ||
| return nil, errors.New("no usable PD client URL found in PD members") | ||
| } | ||
| return pdAddrs, nil | ||
|
ystaticy marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
| // ParseURL parses the given URL to get the host and port. | ||
| func ParseURL(rawURL string) (prefix string, host string, port string, err error) { | ||
| u, parseErr := url.Parse(rawURL) | ||
| if parseErr != nil { | ||
| return "", "", "", invalidURLHostPortErr() | ||
| } | ||
|
|
||
| switch u.Scheme { | ||
| case "http": | ||
| prefix = "http://" | ||
| case "https": | ||
| prefix = "https://" | ||
| default: | ||
| return "", "", "", fmt.Errorf("invalid URL prefix") | ||
| } | ||
|
|
||
| host, port, err = parseHostPort(u.Host) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we will |
||
| if err != nil { | ||
| return "", "", "", invalidURLHostPortErr() | ||
| } | ||
|
|
||
| return prefix, host, port, nil | ||
| } | ||
|
|
||
| func parseHostPort(rawHostPort string) (host string, port string, err error) { | ||
| host, port, err = net.SplitHostPort(rawHostPort) | ||
| if err != nil { | ||
| return "", "", err | ||
| } | ||
|
|
||
| if host == "" || port == "" { | ||
| return "", "", errors.New("invalid host or port") | ||
| } | ||
|
|
||
| return host, port, nil | ||
| } | ||
|
|
||
| func invalidURLHostPortErr() error { | ||
| return fmt.Errorf("invalid URL format, expect host:port") | ||
| } | ||
|
|
||
| // GetPDHttpAddrs is used to get PD http addrs. | ||
| func (n *client) GetPDHttpAddrs(ctx context.Context) ([]string, error) { | ||
| addrs, err := GetPDAddrs(ctx, n.pdCli, true) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return addrs, err | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| // Copyright 2026 PingCAP, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package metaservice | ||
|
|
||
| import ( | ||
| "context" | ||
| "net" | ||
| "runtime" | ||
| "testing" | ||
|
|
||
| "github.com/pingcap/kvproto/pkg/pdpb" | ||
| "github.com/stretchr/testify/require" | ||
| pd "github.com/tikv/pd/client" | ||
| "go.etcd.io/etcd/tests/v3/integration" | ||
| ) | ||
|
|
||
| type mockPDClient struct { | ||
| pd.Client | ||
| members []*pdpb.Member | ||
| } | ||
|
|
||
| func (c *mockPDClient) GetAllMembers(context.Context) (*pdpb.GetMembersResponse, error) { | ||
| return &pdpb.GetMembersResponse{Members: c.members}, nil | ||
| } | ||
|
|
||
| // ETCD use ip:port as unix socket address, however this address is invalid on windows. | ||
| // We have to skip some of the test in such case. | ||
| // https://github.com/etcd-io/etcd/blob/f0faa5501d936cd8c9f561bb9d1baca70eb67ab1/pkg/types/urls.go#L42 | ||
| func unixSocketAvailable() bool { | ||
| c, err := net.Listen("unix", "127.0.0.1:0") | ||
| if err == nil { | ||
| _ = c.Close() | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // TestGetPDAddrsWithRealClient tests the GetPDAddrs method with a real etcd client | ||
| func TestGetPDAddrsWithRealClient(t *testing.T) { | ||
| integration.BeforeTestExternal(t) | ||
| if runtime.GOOS == "windows" { | ||
| t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.") | ||
| } | ||
|
|
||
| // Initialize etcd client | ||
| cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) | ||
|
D3Hunter marked this conversation as resolved.
|
||
| defer cluster.Terminate(t) | ||
| etcdCli := cluster.RandClient() | ||
|
|
||
| expectAddrs := []string{"127.0.0.1:1111"} | ||
| pdCli := &mockPDClient{ | ||
| members: []*pdpb.Member{{ | ||
| ClientUrls: []string{"http://127.0.0.1:1111"}, | ||
| }}, | ||
| } | ||
|
|
||
| serviceClient := newClient(etcdCli, pdCli) | ||
| addrs, err := serviceClient.GetPDAddrs(context.Background()) | ||
| require.NoError(t, err) | ||
| require.Equal(t, expectAddrs, addrs) | ||
|
|
||
| t.Run("empty client urls returns error", func(t *testing.T) { | ||
| pdCli := &mockPDClient{ | ||
| members: []*pdpb.Member{ | ||
| {}, | ||
| {ClientUrls: []string{}}, | ||
| }, | ||
| } | ||
|
|
||
| serviceClient := newClient(etcdCli, pdCli) | ||
| addrs, err := serviceClient.GetPDAddrs(context.Background()) | ||
| require.Error(t, err) | ||
| require.Nil(t, addrs) | ||
| require.EqualError(t, err, "no usable PD client URL found in PD members") | ||
| }) | ||
| } | ||
|
|
||
| // TestParseURL tests the ParseURL function with various inputs. | ||
| func TestParseURL(t *testing.T) { | ||
| tests := []struct { | ||
| rawURL string | ||
| prefix string | ||
| host string | ||
| port string | ||
| err bool | ||
| }{ | ||
| // Successful test cases | ||
| {"http://example.com:8080", "http://", "example.com", "8080", false}, | ||
| {"https://localhost:443", "https://", "localhost", "443", false}, // Specified port for HTTPS | ||
| {"http://[2001:db8::1]:2379", "http://", "2001:db8::1", "2379", false}, | ||
| {"https://[2001:db8::1]:443", "https://", "2001:db8::1", "443", false}, | ||
|
|
||
| // Unsuccessful test cases | ||
| {"ftp://example.com", "ftp://", "", "", true}, // Invalid prefix | ||
| {"unix://localhost:m0", "unix://", "", "", true}, // Unix schema is unsupported | ||
| {"unix://localhost", "unix://", "", "", true}, // Unix schema is unsupported | ||
| {"http://example.com:8080:extra", "http://", "", "", true}, // Extra part after port | ||
| {"https://:8080", "https://", "", "", true}, // Missing host | ||
| {"http://", "http://", "", "", true}, // Incomplete URL | ||
| {"https://example.com", "https://", "", "", true}, // Missing port | ||
| {"http://localhost", "http://", "", "", true}, // Missing port | ||
| {"https://[2001:db8::1]", "https://", "", "", true}, // Missing port | ||
| {"http://2001:db8::1:2379", "http://", "", "", true}, // Unbracketed IPv6 with port | ||
| {"https://[2001:db8::1", "https://", "", "", true}, // Invalid bracketed IPv6 | ||
| } | ||
|
|
||
| for _, test := range tests { | ||
| prefix, host, port, err := ParseURL(test.rawURL) | ||
|
|
||
| // Check if the error status matches the expectation | ||
| if test.err { | ||
| require.Error(t, err, "Expected an error for input: "+test.rawURL) | ||
| require.Empty(t, prefix, "Expected an error for input: "+test.rawURL) | ||
| require.Empty(t, host, "Host should be empty for input: "+test.rawURL) | ||
| require.Empty(t, port, "Port should be empty for input: "+test.rawURL) | ||
| } else { | ||
| require.NoError(t, err, "Did not expect an error for input: "+test.rawURL) | ||
| require.Equal(t, test.prefix, prefix, "prefix mismatch for input: "+test.rawURL) | ||
| require.Equal(t, test.host, host, "Host mismatch for input: "+test.rawURL) | ||
| require.Equal(t, test.port, port, "Port mismatch for input: "+test.rawURL) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why
The new metaservice package reimplements PD member retry and ClientUrls parsing instead of sharing the existing TiKV store path that already derives client endpoints from the same PD members.
Scope
pkg/metaservice/etcd.go:62 and pkg/store/driver/tikv_driver.go:300
Risk if unchanged
The two paths can drift on retry context, config override behavior, URL validation, scheme handling, and empty-member behavior. A future meta-service caller may get different endpoints than the existing store helper for the same PD members.
Evidence
GetPDHostPorts builds a tikv Backoffer, calls pdClient.GetAllMembers, parses member.ClientUrls[0], and appends host or host-with-scheme. tikvStore.EtcdAddrs already builds a Backoffer, calls s.GetPDClient().GetAllMembers, parses the same member.ClientUrls[0], and appends u.Host.
Change request
Can we keep one canonical helper for PD member endpoint extraction and have both call sites use it?