Skip to content
Draft
4 changes: 2 additions & 2 deletions cluster/utils/adaptivesvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func DoesAdaptiveServiceReachLimitation(err error) bool {
if err == nil {
return false
}
return err.Error() == ReachLimitationErrorString
return strings.HasSuffix(err.Error(), ReachLimitationErrorString)
}

func IsAdaptiveServiceFailed(err error) bool {
if err == nil {
return false
}
return strings.HasPrefix(err.Error(), adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
return strings.Contains(err.Error(), adaptivesvc.ErrAdaptiveSvcInterrupted.Error())
}
41 changes: 41 additions & 0 deletions cluster/utils/adaptivesvc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package utils

import (
"errors"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

func TestDoesAdaptiveServiceReachLimitation(t *testing.T) {
assert.False(t, DoesAdaptiveServiceReachLimitation(nil))
assert.True(t, DoesAdaptiveServiceReachLimitation(errors.New(ReachLimitationErrorString)))
assert.True(t, DoesAdaptiveServiceReachLimitation(errors.New("unknown: "+ReachLimitationErrorString)))
assert.False(t, DoesAdaptiveServiceReachLimitation(errors.New("adaptive service interrupted: another error")))
}

func TestIsAdaptiveServiceFailed(t *testing.T) {
assert.False(t, IsAdaptiveServiceFailed(nil))
assert.True(t, IsAdaptiveServiceFailed(errors.New("adaptive service interrupted: reach limitation")))
assert.True(t, IsAdaptiveServiceFailed(errors.New("unknown: adaptive service interrupted: reach limitation")))
assert.False(t, IsAdaptiveServiceFailed(errors.New("unavailable: network error")))
}
12 changes: 2 additions & 10 deletions filter/adaptivesvc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,8 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker base

func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, res result.Result, invoker base.Invoker,
invocation base.Invocation) result.Result {
var asEnabled string
asEnabledIface := res.Attachment(constant.AdaptiveServiceEnabledKey, nil)
if asEnabledIface != nil {
if str, strOK := asEnabledIface.(string); strOK {
asEnabled = str
} else if strArr, strArrOK := asEnabledIface.([]string); strArrOK && len(strArr) > 0 {
asEnabled = strArr[0]
}
}
if asEnabled != constant.AdaptiveServiceIsEnabled {
if invocation.GetAttachmentWithDefaultValue(constant.AdaptiveServiceEnabledKey, "") !=
constant.AdaptiveServiceIsEnabled {
// the adaptive service is enabled on the invocation
return res
}
Expand Down
9 changes: 6 additions & 3 deletions filter/adaptivesvc/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func TestAdaptiveServiceProviderFilter_OnResponse(t *testing.T) {
})

t.Run("InterruptedErrorShouldSkip", func(t *testing.T) {
invoc := invocation.NewRPCInvocation(methodName, nil, nil)
invoc := invocation.NewRPCInvocation(methodName, nil, map[string]any{
constant.AdaptiveServiceEnabledKey: constant.AdaptiveServiceIsEnabled,
})
res := &result.RPCResult{Err: wrapErrAdaptiveSvcInterrupted("limit exceeded")}
invoker := mock.NewMockInvoker(ctrl)

Expand All @@ -109,12 +111,13 @@ func TestAdaptiveServiceProviderFilter_OnResponse(t *testing.T) {
})

t.Run("SuccessWithAttachments", func(t *testing.T) {
invoc := invocation.NewRPCInvocation(methodName, nil, nil)
invoc := invocation.NewRPCInvocation(methodName, nil, map[string]any{
constant.AdaptiveServiceEnabledKey: constant.AdaptiveServiceIsEnabled,
})
updater := &mockUpdater{}
invoc.SetAttribute(constant.AdaptiveServiceUpdaterKey, updater)

res := &result.RPCResult{Rest: "ok"}
res.AddAttachment(constant.AdaptiveServiceEnabledKey, constant.AdaptiveServiceIsEnabled)

invoker := mock.NewMockInvoker(ctrl)
invoker.EXPECT().GetURL().Return(u).AnyTimes()
Expand Down
14 changes: 14 additions & 0 deletions filter/adaptivesvc/limiter/hill_climbing.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ func (l *HillClimbing) Remaining() uint64 {
return limitation - inflight
}

func (l *HillClimbing) Snapshot() Snapshot {
limitation := l.limitation.Load()
inflight := l.inflight.Load()
remaining := uint64(0)
if limitation >= inflight {
remaining = limitation - inflight
}
return Snapshot{
Inflight: inflight,
Remaining: remaining,
Limitation: limitation,
}
}

func (l *HillClimbing) Acquire() (Updater, error) {
if l.Remaining() == 0 {
return nil, ErrReachLimitation
Expand Down
17 changes: 17 additions & 0 deletions filter/adaptivesvc/limiter/hill_climbing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,20 @@ func TestHillClimbing_Remaining(t *testing.T) {
remaining = limiter.Remaining()
assert.Equal(t, uint64(0), remaining)
}

func TestHillClimbing_Snapshot(t *testing.T) {
limiter := NewHillClimbing().(*HillClimbing)
limiter.limitation.Store(100)
limiter.inflight.Store(30)

snapshot := limiter.Snapshot()
assert.Equal(t, uint64(100), snapshot.Limitation)
assert.Equal(t, uint64(30), snapshot.Inflight)
assert.Equal(t, uint64(70), snapshot.Remaining)

limiter.inflight.Store(120)
snapshot = limiter.Snapshot()
assert.Equal(t, uint64(100), snapshot.Limitation)
assert.Equal(t, uint64(120), snapshot.Inflight)
assert.Equal(t, uint64(0), snapshot.Remaining)
}
7 changes: 7 additions & 0 deletions filter/adaptivesvc/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ const (
type Limiter interface {
Inflight() uint64
Remaining() uint64
Snapshot() Snapshot
// Acquire inspects the current status of the system:
// - if reaches the limitation, reject the request immediately.
// - if not, grant this request and return an Updater defined below.
Acquire() (Updater, error)
}

type Snapshot struct {
Inflight uint64
Remaining uint64
Limitation uint64
}

type Updater interface {
// DoUpdate is called once an invocation is finished, it tells Updater that the invocation is finished, and please
// update the Remaining, Inflight parameters of the Limiter.
Expand Down
22 changes: 20 additions & 2 deletions filter/adaptivesvc/limiter_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import (
"fmt"
"strings"
"sync"
)

Expand Down Expand Up @@ -51,7 +52,7 @@
}

func (m *limiterMapper) newAndSetMethodLimiter(url *common.URL, methodName string, limiterType int) (limiter.Limiter, error) {
key := fmt.Sprintf("%s%s", url.Path, methodName)
key := methodLimiterKey(url.Path, methodName)

var (
l limiter.Limiter
Expand All @@ -75,7 +76,7 @@

func (m *limiterMapper) getMethodLimiter(url *common.URL, methodName string) (
limiter.Limiter, error) {
key := fmt.Sprintf("%s%s", url.Path, methodName)
key := methodLimiterKey(url.Path, methodName)
m.rwMutex.RLock()
l, ok := limiterMapperSingleton.mapper[key]
m.rwMutex.RUnlock()
Expand All @@ -84,3 +85,20 @@
}
return l, nil
}

func GetMethodLimiterSnapshot(interfaceName string, methodName string) (limiter.Snapshot, bool) {

Check warning on line 89 in filter/adaptivesvc/limiter_mapper.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Group together these consecutive parameters of the same type.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZ51BwKfUJpQcxyEbkRI&open=AZ51BwKfUJpQcxyEbkRI&pullRequest=3347
limiterMapperSingleton.rwMutex.RLock()
l, ok := limiterMapperSingleton.mapper[methodLimiterKey(interfaceName, methodName)]
if !ok && !strings.HasPrefix(interfaceName, "/") {
l, ok = limiterMapperSingleton.mapper[methodLimiterKey("/"+interfaceName, methodName)]
}
limiterMapperSingleton.rwMutex.RUnlock()
if !ok {
return limiter.Snapshot{}, false
}
return l.Snapshot(), true
}

func methodLimiterKey(interfaceName string, methodName string) string {

Check warning on line 102 in filter/adaptivesvc/limiter_mapper.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Group together these consecutive parameters of the same type.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZ51BwKfUJpQcxyEbkRJ&open=AZ51BwKfUJpQcxyEbkRJ&pullRequest=3347
return fmt.Sprintf("%s%s", interfaceName, methodName)
}
40 changes: 40 additions & 0 deletions filter/adaptivesvc/limiter_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,43 @@ func TestLimiterMapper_getMethodLimiter(t *testing.T) {
assert.Nil(t, l)
assert.Equal(t, ErrLimiterNotFoundOnMapper, err)
}

func TestGetMethodLimiterSnapshot(t *testing.T) {
mapper := newLimiterMapper()
oldMapper := limiterMapperSingleton
limiterMapperSingleton = mapper
t.Cleanup(func() {
limiterMapperSingleton = oldMapper
})

url := &common.URL{Path: "/testService"}
methodName := "testMethod"
_, err := mapper.newAndSetMethodLimiter(url, methodName, limiter.HillClimbingLimiter)
require.NoError(t, err)

snapshot, ok := GetMethodLimiterSnapshot(url.Path, methodName)
require.True(t, ok)
assert.Equal(t, uint64(50), snapshot.Limitation)

snapshot, ok = GetMethodLimiterSnapshot("/unknownService", methodName)
assert.False(t, ok)
assert.Equal(t, limiter.Snapshot{}, snapshot)
}

func TestGetMethodLimiterSnapshot_WithInterfaceNameWithoutLeadingSlash(t *testing.T) {
mapper := newLimiterMapper()
oldMapper := limiterMapperSingleton
limiterMapperSingleton = mapper
t.Cleanup(func() {
limiterMapperSingleton = oldMapper
})

url := &common.URL{Path: "/testService"}
methodName := "testMethod"
_, err := mapper.newAndSetMethodLimiter(url, methodName, limiter.HillClimbingLimiter)
require.NoError(t, err)

snapshot, ok := GetMethodLimiterSnapshot("testService", methodName)
require.True(t, ok)
assert.Equal(t, uint64(50), snapshot.Limitation)
}
4 changes: 2 additions & 2 deletions graceful_shutdown/closing_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ func (h *closingEventHandler) HandleClosingEvent(event ClosingEvent) bool {
defaultClosingAckTracker.record(event, removed)
if isActiveClosingSource(event.Source) {
if removed {
logger.Infof("Graceful shutdown --- Active closing ack handled, source=%s service=%s address=%s instance=%s",
logger.Infof("[GracefulShutdown] active closing ack handled, source=%s service=%s address=%s instance=%s",
event.Source, event.ServiceKey, event.Address, event.InstanceKey)
} else {
logger.Warnf("Graceful shutdown --- Active closing ack missed local directory, source=%s service=%s address=%s instance=%s",
logger.Warnf("[GracefulShutdown] active closing ack missed local directory, source=%s service=%s address=%s instance=%s",
event.Source, event.ServiceKey, event.Address, event.InstanceKey)
}
}
Expand Down
2 changes: 1 addition & 1 deletion graceful_shutdown/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
func parseDuration(timeout string, desc string, def time.Duration) time.Duration {
res, err := time.ParseDuration(timeout)
if err != nil {
logger.Errorf("The %s configuration is invalid: %s, and we will use the default value: %s, err: %v",
logger.Errorf("[GracefulShutdown] the %s configuration is invalid: %s, and we will use the default value: %s, err=%v",
desc, timeout, def.String(), err)
res = def
}
Expand Down
Loading
Loading