diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index ab5f8cbf93..96038f0a94 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -37,18 +37,49 @@ import ( ) type dataListener struct { - interestedURL []*common.URL - listener config_center.ConfigurationListener + mutex sync.Mutex + subscribed map[string]config_center.ConfigurationListener + closed bool } // NewRegistryDataListener creates a data listener for etcd -func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { - return &dataListener{listener: listener} +func NewRegistryDataListener() *dataListener { + return &dataListener{ + subscribed: make(map[string]config_center.ConfigurationListener), + } +} + +// SubscribeURL sets a watch listener for url. +func (l *dataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) { + l.mutex.Lock() + defer l.mutex.Unlock() + l.subscribeURLLocked(url, listener) +} + +func (l *dataListener) subscribeURLLocked(url *common.URL, listener config_center.ConfigurationListener) { + if l.closed { + return + } + l.subscribed[url.ServiceKey()] = listener } -// AddInterestedURL adds a registration @url to listen -func (l *dataListener) AddInterestedURL(url *common.URL) { - l.interestedURL = append(l.interestedURL, url) +// UnSubscribeURL unsets a watch listener for url. +func (l *dataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { + l.mutex.Lock() + defer l.mutex.Unlock() + return l.unsubscribeURLLocked(url) +} + +func (l *dataListener) unsubscribeURLLocked(url *common.URL) config_center.ConfigurationListener { + if l.closed { + return nil + } + listener := l.subscribed[url.ServiceKey()] + if listener != nil { + closeConfigurationListener(listener) + delete(l.subscribed, url.ServiceKey()) + } + return listener } // DataChange processes the data change event from registry center of etcd @@ -65,36 +96,73 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { return false } - for _, v := range l.interestedURL { - if serviceURL.URLEqual(v) { - l.listener.Process( + l.mutex.Lock() + defer l.mutex.Unlock() + if l.closed { + return false + } + match := false + for serviceKey, listener := range l.subscribed { + intf, group, version := common.ParseServiceKey(serviceKey) + if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) { + listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, - Value: serviceURL, + Value: serviceURL.Clone(), ConfigType: eventType.Action, }, ) - return true + match = true } } - return false + return match +} + +// Close closes all subscribed configuration listeners. +func (l *dataListener) Close() { + l.mutex.Lock() + defer l.mutex.Unlock() + l.closed = true + for _, listener := range l.subscribed { + closeConfigurationListener(listener) + } +} + +func closeConfigurationListener(listener config_center.ConfigurationListener) { + etcdListener, ok := listener.(*configurationListener) + if ok && etcdListener != nil { + etcdListener.Close() + } } type configurationListener struct { - registry *etcdV3Registry - events *gxchan.UnboundedChan - closeOnce sync.Once + registry *etcdV3Registry + events *gxchan.UnboundedChan + isClosed bool + close chan struct{} + closeOnce sync.Once + subscribeURL *common.URL } // NewConfigurationListener for listening the event of etcdv3. -func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { +func NewConfigurationListener(reg *etcdV3Registry, conf *common.URL) *configurationListener { // add a new waiter reg.WaitGroup().Add(1) - return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)} + return &configurationListener{ + registry: reg, + events: gxchan.NewUnboundedChan(32), + close: make(chan struct{}), + subscribeURL: conf, + } } // Process data change event from config center of etcd func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { + select { + case <-l.close: + return + default: + } l.events.In() <- configType } @@ -102,6 +170,9 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { + case <-l.close: + return nil, perrors.New("listener has been closed") + case <-l.registry.Done(): logger.Warn("[Registry][Etcdv3] listener's etcd client connection is broken, so etcd event listener exit now") return nil, perrors.New("listener stopped") @@ -109,12 +180,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { case val := <-l.events.Out(): e, _ := val.(*config_center.ConfigChangeEvent) logger.Infof("[Registry][Etcdv3] got etcd event %#v", e) - if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() { - select { - case <-l.registry.Done(): - logger.Warnf("[Registry][Etcdv3] update @result{%s}. But its connection to registry is invalid", e.Value) - default: - } + if l.shouldIgnoreDeleteEvent(e) { continue } return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil @@ -122,9 +188,33 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } +func (l *configurationListener) shouldIgnoreDeleteEvent(e *config_center.ConfigChangeEvent) bool { + if e.ConfigType != remoting.EventTypeDel { + return false + } + select { + case <-l.registry.Done(): + logger.Warnf("[Registry][Etcdv3] update @result{%s}. But its connection to registry is invalid", e.Value) + return true + default: + } + return l.registry.client == nil || !validEtcdClient(l.registry.client) +} + +func (l *configurationListener) closed() bool { + select { + case <-l.close: + return true + default: + return false + } +} + // Close etcd registry center func (l *configurationListener) Close() { l.closeOnce.Do(func() { + l.isClosed = true + close(l.close) l.registry.WaitGroup().Done() }) } diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go index 4a7223ba40..fb22c0fc76 100644 --- a/registry/etcdv3/listener_test.go +++ b/registry/etcdv3/listener_test.go @@ -18,43 +18,272 @@ package etcdv3 import ( + "net/url" + "testing" +) + +import ( + gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/remoting" ) type MockDataListener struct{} func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {} -/* -func Test_dataListener_DataChange(t *testing.T) { - tests := []struct { - name string - fields dataListenerFields - args args - want bool - }{ - { - name: "test", - fields: dataListenerFields{ - interestedURL: nil, - listener: &MockDataListener{}, - }, - args: args{ - eventType: remoting.Event{ - Path: "com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100", - }, - }, - want: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := newDataListener(tt.fields) - if got := l.DataChange(tt.args.eventType); got != tt.want { - t.Errorf("DataChange() = %v, want %v", got, tt.want) - } - }) - } -} - -*/ +func TestDataListenerDataChangeDispatchesToSubscribedService(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, listener) + + ok := dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/providers/" + url.QueryEscape(serviceURL.String()), + Action: remoting.EventTypeAdd, + }) + require.True(t, ok) + + event, err := listener.Next() + require.NoError(t, err) + assert.Equal(t, remoting.EventTypeAdd, event.Action) + assert.Equal(t, serviceURL.ServiceKey(), event.Service.ServiceKey()) +} + +func TestDataListenerUnsubscribeStopsDispatch(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, listener) + removed := dataListener.UnSubscribeURL(serviceURL) + + require.Same(t, listener, removed) + require.True(t, listener.isClosed) + assert.False(t, dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/providers/" + url.QueryEscape(serviceURL.String()), + Action: remoting.EventTypeAdd, + })) +} + +func TestDataListenerUnsubscribeHandlesNonConfigurationListener(t *testing.T) { + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := &MockDataListener{} + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, listener) + + var removed config_center.ConfigurationListener + require.NotPanics(t, func() { + removed = dataListener.UnSubscribeURL(serviceURL) + }) + + require.Same(t, listener, removed) + assert.Empty(t, dataListener.subscribed) +} + +func TestDataListenerCloseClosesSubscriptions(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, listener) + dataListener.Close() + + require.True(t, dataListener.closed) + require.True(t, listener.isClosed) +} + +func TestDataListenerCloseHandlesNonConfigurationListener(t *testing.T) { + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, &MockDataListener{}) + + require.NotPanics(t, dataListener.Close) + assert.True(t, dataListener.closed) +} + +func TestDataListenerIgnoresClosedSubscriptions(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + dataListener := NewRegistryDataListener() + dataListener.Close() + + dataListener.SubscribeURL(serviceURL, listener) + + assert.Empty(t, dataListener.subscribed) + assert.Nil(t, dataListener.UnSubscribeURL(serviceURL)) + assert.False(t, dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/providers/" + url.QueryEscape(serviceURL.String()), + Action: remoting.EventTypeAdd, + })) +} + +func TestDataListenerDataChangeRejectsInvalidEvents(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(serviceURL, listener) + + assert.False(t, dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/consumers/" + url.QueryEscape(serviceURL.String()), + Action: remoting.EventTypeAdd, + })) + assert.False(t, dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/providers/%", + Action: remoting.EventTypeAdd, + })) + + otherServiceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.OtherProvider?group=g&version=v") + assert.False(t, dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.OtherProvider/providers/" + url.QueryEscape(otherServiceURL.String()), + Action: remoting.EventTypeAdd, + })) +} + +func TestDataListenerDataChangeDispatchesWildcardSubscription(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + wildcardURL := mustURL(t, "dubbo://127.0.0.1:20000/*?group=*&version=*") + listener := NewConfigurationListener(reg, wildcardURL) + defer listener.Close() + + dataListener := NewRegistryDataListener() + dataListener.SubscribeURL(wildcardURL, listener) + + ok := dataListener.DataChange(remoting.Event{ + Path: "/dubbo/org.apache.demo.UserProvider/providers/" + url.QueryEscape(serviceURL.String()), + Action: remoting.EventTypeUpdate, + }) + require.True(t, ok) + + event, err := listener.Next() + require.NoError(t, err) + assert.Equal(t, remoting.EventTypeUpdate, event.Action) + assert.Equal(t, serviceURL.ServiceKey(), event.Service.ServiceKey()) +} + +func TestConfigurationListenerCloseStopsProcessAndNext(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + + listener.Close() + listener.Process(&config_center.ConfigChangeEvent{ + Key: serviceURL.String(), + Value: serviceURL, + ConfigType: remoting.EventTypeAdd, + }) + event, err := listener.Next() + + require.Error(t, err) + assert.Nil(t, event) + assert.ErrorContains(t, err, "listener has been closed") +} + +func TestConfigurationListenerNextStopsWhenRegistryDone(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + close(reg.Done()) + event, err := listener.Next() + + require.Error(t, err) + assert.Nil(t, event) + assert.ErrorContains(t, err, "listener stopped") +} + +func TestConfigurationListenerNextReturnsDeleteEventWhenClientValid(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.client = &gxetcd.Client{} + restore := stubValidEtcdClient(true) + defer restore() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + listener.Process(&config_center.ConfigChangeEvent{ + Key: serviceURL.String(), + Value: serviceURL, + ConfigType: remoting.EventTypeDel, + }) + event, err := listener.Next() + + require.NoError(t, err) + assert.Equal(t, remoting.EventTypeDel, event.Action) +} + +func TestConfigurationListenerNextSkipsDeleteEventWhenClientInvalid(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + + listener.Process(&config_center.ConfigChangeEvent{ + Key: serviceURL.String(), + Value: serviceURL, + ConfigType: remoting.EventTypeDel, + }) + listener.Process(&config_center.ConfigChangeEvent{ + Key: serviceURL.String(), + Value: serviceURL, + ConfigType: remoting.EventTypeAdd, + }) + event, err := listener.Next() + + require.NoError(t, err) + assert.Equal(t, remoting.EventTypeAdd, event.Action) +} + +func TestConfigurationListenerShouldIgnoreDeleteEventAfterRegistryDone(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.client = &gxetcd.Client{} + restore := stubValidEtcdClient(true) + defer restore() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + close(reg.Done()) + + ignore := listener.shouldIgnoreDeleteEvent(&config_center.ConfigChangeEvent{ + Key: serviceURL.String(), + Value: serviceURL, + ConfigType: remoting.EventTypeDel, + }) + + assert.True(t, ignore) +} + +func newTestEtcdRegistry(t *testing.T) *etcdV3Registry { + t.Helper() + registryURL := mustURL(t, "etcdv3://127.0.0.1:2379") + reg := &etcdV3Registry{} + reg.InitBaseRegistry(registryURL, reg) + return reg +} + +func mustURL(t *testing.T, rawURL string) *common.URL { + t.Helper() + parsedURL, err := common.NewURL(rawURL) + require.NoError(t, err) + return parsedURL +} diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index eaa93c6269..6feb592e30 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -43,18 +43,23 @@ const ( Name = "etcdv3" ) +var ( + listenServiceEvent = (*etcdv3.EventListener).ListenServiceEvent + validEtcdClient = (*gxetcd.Client).Valid + deleteEtcdKey = (*gxetcd.Client).Delete +) + func init() { extension.SetRegistry(Name, newETCDV3Registry) } type etcdV3Registry struct { registry.BaseRegistry - cltLock sync.Mutex - client *gxetcd.Client - listenerLock sync.RWMutex - listener *etcdv3.EventListener - dataListener *dataListener - configListener *configurationListener + cltLock sync.Mutex + client *gxetcd.Client + listenerLock sync.RWMutex + listener *etcdv3.EventListener + dataListener *dataListener } // Client gets the etcdv3 client @@ -99,8 +104,24 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { // InitListeners init listeners of etcd registry center func (r *etcdV3Registry) InitListeners() { r.listener = etcdv3.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) + newDataListener := NewRegistryDataListener() + if r.dataListener != nil { + oldDataListener := r.dataListener + oldDataListener.mutex.Lock() + defer oldDataListener.mutex.Unlock() + oldDataListener.closed = true + for _, oldListener := range oldDataListener.subscribed { + etcdListener, ok := oldListener.(*configurationListener) + if !ok || etcdListener == nil || etcdListener.subscribeURL == nil { + continue + } + etcdListener.Close() + newListener := NewConfigurationListener(r, etcdListener.subscribeURL) + newDataListener.SubscribeURL(etcdListener.subscribeURL, newListener) + go listenServiceEvent(r.listener, etcdProviderPath(etcdListener.subscribeURL), newDataListener) + } + } + r.dataListener = newDataListener } // DoRegister actually do the register job in the registry center of etcd @@ -109,9 +130,13 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.RegisterTemp(path.Join(root, node), "") } -// DoUnregister is not supported in etcdV3Registry. func (r *etcdV3Registry) DoUnregister(root string, node string) error { - return perrors.New("DoUnregister is not support in etcdV3Registry") + r.cltLock.Lock() + defer r.cltLock.Unlock() + if r.client == nil || !validEtcdClient(r.client) { + return perrors.New("etcd client is not valid") + } + return deleteEtcdKey(r.client, path.Join(root, node)) } // CloseAndNilClient closes listeners and clear client @@ -122,8 +147,8 @@ func (r *etcdV3Registry) CloseAndNilClient() { // CloseListener closes listeners func (r *etcdV3Registry) CloseListener() { - if r.configListener != nil { - r.configListener.Close() + if r.dataListener != nil { + r.dataListener.Close() } } @@ -142,9 +167,21 @@ func (r *etcdV3Registry) CreatePath(k string) error { // DoSubscribe actually subscribe the provider URL func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { - r.listenerLock.RLock() - configListener := r.configListener - r.listenerLock.RUnlock() + if r.dataListener == nil { + r.dataListener = NewRegistryDataListener() + } + r.dataListener.mutex.Lock() + defer r.dataListener.mutex.Unlock() + if listener := r.dataListener.subscribed[svc.ServiceKey()]; listener != nil { + etcdListener, _ := listener.(*configurationListener) + if etcdListener != nil { + if etcdListener.closed() { + return nil, perrors.New("configListener already been closed") + } + return etcdListener, nil + } + } + if r.listener == nil { r.cltLock.Lock() client := r.client @@ -158,14 +195,41 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) } // register the svc to dataListener - r.dataListener.AddInterestedURL(svc) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()), r.dataListener) + configListener := NewConfigurationListener(r, svc) + r.dataListener.subscribeURLLocked(svc, configListener) + go listenServiceEvent(r.listener, etcdProviderPath(svc), r.dataListener) return configListener, nil } func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { - return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry") + if r.dataListener == nil { + return nil, perrors.New("etcd data listener is nil, can not close") + } + if r.listener == nil { + return nil, perrors.New("etcd event listener is nil, can not close") + } + r.dataListener.mutex.Lock() + subscribedListener := r.dataListener.subscribed[conf.ServiceKey()] + if subscribedListener != nil { + etcdListener, _ := subscribedListener.(*configurationListener) + if etcdListener != nil && etcdListener.closed() { + r.dataListener.mutex.Unlock() + return nil, perrors.Errorf("configListener for service %s has already been closed", conf.ServiceKey()) + } + } + if subscribedListener == nil { + r.dataListener.mutex.Unlock() + return nil, nil + } + registryListener, ok := subscribedListener.(registry.Listener) + if !ok { + r.dataListener.mutex.Unlock() + return nil, perrors.Errorf("listener for service %s is not a registry listener", conf.ServiceKey()) + } + r.dataListener.unsubscribeURLLocked(conf) + r.dataListener.mutex.Unlock() + return registryListener, nil } // LoadSubscribeInstances load subscribe instance @@ -177,3 +241,7 @@ func (r *etcdV3Registry) handleClientRestart() { r.WaitGroup().Add(1) go etcdv3.HandleClientRestart(r) } + +func etcdProviderPath(svc *common.URL) string { + return fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()) +} diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 02c718f323..e88098ba2b 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -18,105 +18,285 @@ // Package etcdv3 contains tests for etcdv3 registry components. package etcdv3 -/* import ( - "reflect" - "sync" "testing" ) import ( - "github.com/agiledragon/gomonkey" - gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) import ( - "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" - "dubbo.apache.org/dubbo-go/v3/remoting/etcdv3" + remotingEtcdv3 "dubbo.apache.org/dubbo-go/v3/remoting/etcdv3" ) -type fields struct { - BaseRegistry registry.BaseRegistry - cltLock sync.Mutex - client *gxetcd.Client - listenerLock sync.RWMutex - listener *etcdv3.EventListener - dataListener *dataListener - configListener *configurationListener -} -type args struct { - root string - node string - eventType remoting.Event -} - -func newEtcdV3Registry(f fields) *etcdV3Registry { - return &etcdV3Registry{ - client: f.client, - listener: f.listener, - dataListener: f.dataListener, - configListener: f.configListener, - } +func TestEtcdV3RegistryDoUnregisterRejectsInvalidClient(t *testing.T) { + reg := newTestEtcdRegistry(t) + + err := reg.DoUnregister("/dubbo", "org.apache.demo.UserProvider") + + require.Error(t, err) + assert.ErrorContains(t, err, "etcd client is not valid") } -func Test_etcdV3Registry_DoRegister(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "RegisterTemp", func(_ *gxetcd.Client, k, v string) error { +func TestEtcdV3RegistryDoUnregisterDeletesValidClientKey(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.client = &gxetcd.Client{} + restoreValid := stubValidEtcdClient(true) + defer restoreValid() + originalDelete := deleteEtcdKey + var deletedKey string + deleteEtcdKey = func(_ *gxetcd.Client, key string) error { + deletedKey = key return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - }, - args: args{ - root: "/dubbo", - node: "/go", - }, - wantErr: false, - }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := newEtcdV3Registry(tt.fields) - if err := r.DoRegister(tt.args.root, tt.args.node); (err != nil) != tt.wantErr { - t.Errorf("DoRegister() error = %v, wantErr %v", err, tt.wantErr) - } - }) + defer func() { + deleteEtcdKey = originalDelete + }() + + err := reg.DoUnregister("/dubbo", "org.apache.demo.UserProvider") + + require.NoError(t, err) + assert.Equal(t, "/dubbo/org.apache.demo.UserProvider", deletedKey) +} + +func TestEtcdV3RegistryDoUnsubscribeClosesServiceListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.listener = remotingEtcdv3.NewEventListener(nil) + reg.dataListener = NewRegistryDataListener() + + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + reg.dataListener.SubscribeURL(serviceURL, listener) + + removed, err := reg.DoUnsubscribe(serviceURL) + + require.NoError(t, err) + require.Same(t, listener, removed) + assert.True(t, listener.isClosed) + _, ok := reg.dataListener.subscribed[serviceURL.ServiceKey()] + assert.False(t, ok) +} + +func TestEtcdV3RegistryInitListenersWithoutExistingSubscriptions(t *testing.T) { + reg := newTestEtcdRegistry(t) + + reg.InitListeners() + + assert.NotNil(t, reg.listener) + require.NotNil(t, reg.dataListener) + assert.Empty(t, reg.dataListener.subscribed) +} + +func TestEtcdV3RegistryInitListenersRecoversExistingSubscriptions(t *testing.T) { + reg := newTestEtcdRegistry(t) + restore := stubListenServiceEvent() + defer restore() + reg.dataListener = NewRegistryDataListener() + + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + oldListener := NewConfigurationListener(reg, serviceURL) + reg.dataListener.SubscribeURL(serviceURL, oldListener) + oldDataListener := reg.dataListener + + reg.InitListeners() + + require.True(t, oldDataListener.closed) + require.True(t, oldListener.isClosed) + require.NotNil(t, reg.dataListener) + recovered := reg.dataListener.subscribed[serviceURL.ServiceKey()] + require.NotNil(t, recovered) + require.NotSame(t, oldListener, recovered) + recovered.(*configurationListener).Close() +} + +func TestEtcdV3RegistryInitListenersSkipsInvalidSubscriptions(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + reg.dataListener.SubscribeURL(serviceURL, &MockDataListener{}) + + reg.InitListeners() + + require.NotNil(t, reg.dataListener) + assert.Empty(t, reg.dataListener.subscribed) +} + +func TestEtcdV3RegistryCloseListenerClosesDataListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + + reg.CloseListener() + + assert.True(t, reg.dataListener.closed) +} + +func TestEtcdV3RegistryDoSubscribeReturnsExistingListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + reg.dataListener.SubscribeURL(serviceURL, listener) + + existing, err := reg.DoSubscribe(serviceURL) + + require.NoError(t, err) + assert.Same(t, listener, existing) +} + +func TestEtcdV3RegistryDoSubscribeRejectsClosedExistingListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + listener.Close() + reg.dataListener.SubscribeURL(serviceURL, listener) + + existing, err := reg.DoSubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, existing) + assert.ErrorContains(t, err, "configListener already been closed") +} + +func TestEtcdV3RegistryDoSubscribeRejectsBrokenClient(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + listener, err := reg.DoSubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, listener) + require.ErrorContains(t, err, "etcd client broken") + require.NotNil(t, reg.dataListener) +} + +func TestEtcdV3RegistryDoSubscribeRegistersNewListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + restore := stubListenServiceEvent() + defer restore() + reg.client = &gxetcd.Client{} + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + listener, err := reg.DoSubscribe(serviceURL) + + require.NoError(t, err) + require.NotNil(t, listener) + defer listener.(*configurationListener).Close() + assert.NotNil(t, reg.listener) + assert.Same(t, listener, reg.dataListener.subscribed[serviceURL.ServiceKey()]) +} + +func TestEtcdV3RegistryDoUnsubscribeRejectsNilDataListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + listener, err := reg.DoUnsubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, listener) + assert.ErrorContains(t, err, "etcd data listener is nil") +} + +func TestEtcdV3RegistryDoUnsubscribeRejectsNilEventListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + listener, err := reg.DoUnsubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, listener) + assert.ErrorContains(t, err, "etcd event listener is nil") +} + +func TestEtcdV3RegistryDoUnsubscribeKeepsSubscriptionWhenEventListenerNil(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.dataListener = NewRegistryDataListener() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + defer listener.Close() + reg.dataListener.SubscribeURL(serviceURL, listener) + + removed, err := reg.DoUnsubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, removed) + require.ErrorContains(t, err, "etcd event listener is nil") + assert.Same(t, listener, reg.dataListener.subscribed[serviceURL.ServiceKey()]) + assert.False(t, listener.closed()) +} + +func TestEtcdV3RegistryDoUnsubscribeIgnoresMissingSubscription(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.listener = remotingEtcdv3.NewEventListener(nil) + reg.dataListener = NewRegistryDataListener() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + listener, err := reg.DoUnsubscribe(serviceURL) + + require.NoError(t, err) + assert.Nil(t, listener) +} + +func TestEtcdV3RegistryDoUnsubscribeRejectsClosedSubscription(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.listener = remotingEtcdv3.NewEventListener(nil) + reg.dataListener = NewRegistryDataListener() + + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + listener := NewConfigurationListener(reg, serviceURL) + listener.Close() + reg.dataListener.SubscribeURL(serviceURL, listener) + + removed, err := reg.DoUnsubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, removed) + assert.ErrorContains(t, err, "has already been closed") +} + +func TestEtcdV3RegistryDoUnsubscribeRejectsNonRegistryListener(t *testing.T) { + reg := newTestEtcdRegistry(t) + reg.listener = remotingEtcdv3.NewEventListener(nil) + reg.dataListener = NewRegistryDataListener() + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + reg.dataListener.SubscribeURL(serviceURL, &MockDataListener{}) + + listener, err := reg.DoUnsubscribe(serviceURL) + + require.Error(t, err) + assert.Nil(t, listener) + assert.ErrorContains(t, err, "not a registry listener") +} + +func TestEtcdProviderPath(t *testing.T) { + serviceURL := mustURL(t, "dubbo://127.0.0.1:20000/org.apache.demo.UserProvider?group=g&version=v") + + assert.Equal(t, "/dubbo/org.apache.demo.UserProvider/providers", etcdProviderPath(serviceURL)) +} + +func stubListenServiceEvent() func() { + original := listenServiceEvent + listenServiceEvent = func(_ *remotingEtcdv3.EventListener, _ string, _ remoting.DataListener) {} + return func() { + listenServiceEvent = original } } -func Test_etcdV3Registry_DoUnregister(t *testing.T) { - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - wantErr: true, - }, +func stubValidEtcdClient(valid bool) func() { + original := validEtcdClient + validEtcdClient = func(_ *gxetcd.Client) bool { + return valid } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := newEtcdV3Registry(tt.fields) - if err := r.DoUnregister(tt.args.root, tt.args.node); (err != nil) != tt.wantErr { - t.Errorf("DoUnregister() error = %v, wantErr %v", err, tt.wantErr) - } - }) + return func() { + validEtcdClient = original } } - -*/