Skip to content
138 changes: 114 additions & 24 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,49 @@ import (
)

type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
mutex sync.Mutex
Comment thread
leno23 marked this conversation as resolved.
subscribed map[string]config_center.ConfigurationListener
closed bool
}

// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
func NewRegistryDataListener() *dataListener {
return &dataListener{
subscribed: make(map[string]config_center.ConfigurationListener),
}
}

// SubscribeURL sets a watch listener for url.
func (l *dataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.subscribeURLLocked(url, listener)
}

func (l *dataListener) subscribeURLLocked(url *common.URL, listener config_center.ConfigurationListener) {
if l.closed {
return
}
l.subscribed[url.ServiceKey()] = listener
}

// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
// UnSubscribeURL unsets a watch listener for url.
func (l *dataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
l.mutex.Lock()
defer l.mutex.Unlock()
return l.unsubscribeURLLocked(url)
}

func (l *dataListener) unsubscribeURLLocked(url *common.URL) config_center.ConfigurationListener {
if l.closed {
return nil
}
listener := l.subscribed[url.ServiceKey()]
if listener != nil {
closeConfigurationListener(listener)
delete(l.subscribed, url.ServiceKey())
}
return listener
}

// DataChange processes the data change event from registry center of etcd
Expand All @@ -65,66 +96,125 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(v) {
l.listener.Process(
l.mutex.Lock()
defer l.mutex.Unlock()
if l.closed {
return false
}
match := false
for serviceKey, listener := range l.subscribed {
intf, group, version := common.ParseServiceKey(serviceKey)
if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
Value: serviceURL.Clone(),
ConfigType: eventType.Action,
},
)
return true
match = true
}
}
return false
return match
}

// Close closes all subscribed configuration listeners.
func (l *dataListener) Close() {
l.mutex.Lock()
defer l.mutex.Unlock()
l.closed = true
for _, listener := range l.subscribed {
closeConfigurationListener(listener)
}
}

func closeConfigurationListener(listener config_center.ConfigurationListener) {
etcdListener, ok := listener.(*configurationListener)
if ok && etcdListener != nil {
etcdListener.Close()
}
}

type configurationListener struct {
registry *etcdV3Registry
events *gxchan.UnboundedChan
closeOnce sync.Once
registry *etcdV3Registry
events *gxchan.UnboundedChan
isClosed bool
close chan struct{}
closeOnce sync.Once
subscribeURL *common.URL
}

// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
func NewConfigurationListener(reg *etcdV3Registry, conf *common.URL) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
return &configurationListener{
registry: reg,
events: gxchan.NewUnboundedChan(32),
close: make(chan struct{}),
subscribeURL: conf,
}
}

// Process data change event from config center of etcd
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
select {
case <-l.close:
return
default:
}
l.events.In() <- configType
}

// Next returns next service event once received
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.close:
return nil, perrors.New("listener has been closed")

case <-l.registry.Done():
logger.Warn("[Registry][Etcdv3] listener's etcd client connection is broken, so etcd event listener exit now")
return nil, perrors.New("listener stopped")

case val := <-l.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
logger.Infof("[Registry][Etcdv3] got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() {
select {
case <-l.registry.Done():
logger.Warnf("[Registry][Etcdv3] update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
if l.shouldIgnoreDeleteEvent(e) {
continue
}
return &registry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil
}
}
}

func (l *configurationListener) shouldIgnoreDeleteEvent(e *config_center.ConfigChangeEvent) bool {
if e.ConfigType != remoting.EventTypeDel {
return false
}
select {
Comment thread
leno23 marked this conversation as resolved.
case <-l.registry.Done():
logger.Warnf("[Registry][Etcdv3] update @result{%s}. But its connection to registry is invalid", e.Value)
return true
default:
}
return l.registry.client == nil || !validEtcdClient(l.registry.client)
}

func (l *configurationListener) closed() bool {
select {
case <-l.close:
return true
default:
return false
}
}

// Close etcd registry center
func (l *configurationListener) Close() {
l.closeOnce.Do(func() {
l.isClosed = true
close(l.close)
l.registry.WaitGroup().Done()
})
}
Loading
Loading