Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cluster/router/affinity/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func newServiceAffinityRoute() *ServiceAffinityRoute {
return &ServiceAffinityRoute{}
}

func (s *ServiceAffinityRoute) SetStaticConfig(cfg *global.RouterConfig) {
if cfg == nil || cfg.Scope != constant.RouterScopeService {
Comment thread
leno23 marked this conversation as resolved.
Outdated
return
}
s.affinityRoute.SetStaticConfig(cfg)
}

func (s *ServiceAffinityRoute) Notify(invokers []base.Invoker) {
if len(invokers) == 0 {
return
Expand Down Expand Up @@ -102,6 +109,13 @@ func newApplicationAffinityRouter(url *common.URL) *ApplicationAffinityRoute {
return a
}

func (s *ApplicationAffinityRoute) SetStaticConfig(cfg *global.RouterConfig) {
if cfg == nil || cfg.Scope != constant.RouterScopeApplication {
return
}
s.affinityRoute.SetStaticConfig(cfg)
}

func (s *ApplicationAffinityRoute) Notify(invokers []base.Invoker) {
if len(invokers) == 0 {
return
Expand Down Expand Up @@ -150,6 +164,38 @@ type affinityRoute struct {
ratio int32
}

// SetStaticConfig applies a RouterConfig directly, bypassing YAML parsing.
// Static and dynamic rules are not merged: later Process updates replace the
// current state built here.
func (a *affinityRoute) SetStaticConfig(cfg *global.RouterConfig) {
Comment thread
leno23 marked this conversation as resolved.
a.mu.Lock()
defer a.mu.Unlock()

a.matcher, a.enabled, a.key, a.ratio = nil, false, "", 0
if cfg == nil {
return
}
if cfg.AffinityAware.Ratio < 0 || cfg.AffinityAware.Ratio > 100 {
logger.Errorf("[Router][Affinity] invalid static affinity ratio: ratio=%d, expected=0-100", cfg.AffinityAware.Ratio)
return
}

key := strings.TrimSpace(cfg.AffinityAware.Key)
enabled := cfg.Enabled == nil || *cfg.Enabled
if !enabled || key == "" {
return
}

rule := strings.Join([]string{key, key}, "=$")
f, err := condition.NewFieldMatcher(rule)
if err != nil {
logger.Errorf("[Router][Affinity] parse static affinity rule failed: key=%s, rule=%s, err=%v", key, rule, err)
return
}
Comment thread
leno23 marked this conversation as resolved.

a.matcher, a.enabled, a.key, a.ratio = &f, true, key, cfg.AffinityAware.Ratio
}

func (a *affinityRoute) Process(event *config_center.ConfigChangeEvent) {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down
109 changes: 109 additions & 0 deletions cluster/router/affinity/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/global"
"dubbo.apache.org/dubbo-go/v3/protocol/base"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
"dubbo.apache.org/dubbo-go/v3/remoting"
Expand Down Expand Up @@ -237,6 +238,114 @@ affinityAware:

}

func TestAffinityRouteSetStaticConfig(t *testing.T) {
invokers := buildInvokers()
consumerURL := newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing")
inv := invocation.NewRPCInvocation("getComment", nil, nil)
cfg := &global.RouterConfig{
Scope: constant.RouterScopeService,
Key: "service.apache.com",
AffinityAware: global.AffinityAware{
Key: "region",
Ratio: 20,
},
}

a := &affinityRoute{}
a.SetStaticConfig(cfg)

got := a.Route(invokers, consumerURL, inv)
want := NewINVOKERS_FILTERS().add("region=$region").filtrate(invokers, consumerURL, inv)
assert.Equal(t, want, got)
}

func TestAffinityRouteSetStaticConfigIgnoresInvalidConfig(t *testing.T) {
invokers := buildInvokers()
consumerURL := newUrl("consumer://127.0.0.1/com.foo.BarService?env=gray&region=beijing")
inv := invocation.NewRPCInvocation("getComment", nil, nil)
enabled := false

tests := []struct {
name string
cfg *global.RouterConfig
}{
{
name: "nil config",
},
{
name: "disabled",
cfg: &global.RouterConfig{
Enabled: &enabled,
AffinityAware: global.AffinityAware{
Key: "region",
Ratio: 20,
},
},
},
{
name: "empty affinity key",
cfg: &global.RouterConfig{
AffinityAware: global.AffinityAware{Ratio: 20},
},
},
{
name: "bad ratio",
cfg: &global.RouterConfig{
AffinityAware: global.AffinityAware{
Key: "region",
Ratio: 101,
},
},
},
{
name: "invalid matcher key",
cfg: &global.RouterConfig{
AffinityAware: global.AffinityAware{
Key: "=",
Ratio: 20,
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &affinityRoute{}
a.SetStaticConfig(tt.cfg)
assert.Equal(t, invokers, a.Route(invokers, consumerURL, inv))
})
}
}

func TestAffinityRouteSetStaticConfigScope(t *testing.T) {
cfg := &global.RouterConfig{
Scope: constant.RouterScopeService,
AffinityAware: global.AffinityAware{
Key: "region",
Ratio: 20,
},
}
serviceRouter := newServiceAffinityRoute()
serviceRouter.SetStaticConfig(nil)
assert.False(t, serviceRouter.enabled)

appScopedCfg := *cfg
appScopedCfg.Scope = constant.RouterScopeApplication
serviceRouter.SetStaticConfig(&appScopedCfg)
assert.False(t, serviceRouter.enabled)

serviceRouter.SetStaticConfig(cfg)
assert.True(t, serviceRouter.enabled)

appRouter := &ApplicationAffinityRoute{}
appRouter.SetStaticConfig(cfg)
assert.False(t, appRouter.enabled)

cfg.Scope = constant.RouterScopeApplication
appRouter.SetStaticConfig(cfg)
assert.True(t, appRouter.enabled)
}

func Test_newApplicationAffinityRouter(t *testing.T) {
u, _ := common.NewURL("condition://0.0.0.0/com.foo.BarService")
router := newApplicationAffinityRouter(u)
Expand Down
4 changes: 4 additions & 0 deletions compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ func compatRouterConfig(c *global.RouterConfig) *config.RouterConfig {
Tags: compatTags(c.Tags),
ScriptType: c.ScriptType,
Script: c.Script,
AffinityAware: config.AffinityAware{
Key: c.AffinityAware.Key,
Ratio: c.AffinityAware.Ratio,
},
}

}
Expand Down
16 changes: 16 additions & 0 deletions compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,19 @@ func TestCompatTraceIntegrationConfig(t *testing.T) {
assert.Nil(t, globalCfg)
})
}

func TestCompatRouterConfigWithAffinityAware(t *testing.T) {
router := &global.RouterConfig{
Key: "service.apache.com",
AffinityAware: global.AffinityAware{
Key: "region",
Ratio: 20,
},
}

got := compatRouterConfig(router)

assert.NotNil(t, got)
assert.Equal(t, router.AffinityAware.Key, got.AffinityAware.Key)
assert.Equal(t, router.AffinityAware.Ratio, got.AffinityAware.Ratio)
}
2 changes: 2 additions & 0 deletions config/router_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type RouterConfig struct {
Tags []Tag `yaml:"tags" json:"tags,omitempty" property:"tags"`
ScriptType string `yaml:"type" json:"type,omitempty" property:"type"`
Script string `yaml:"script" json:"script,omitempty" property:"script"`

AffinityAware AffinityAware `yaml:"affinityAware" json:"affinityAware,omitempty" property:"affinityAware"`
}

type Tag struct {
Expand Down
5 changes: 5 additions & 0 deletions global/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,10 @@ func TestRouterConfigClone(t *testing.T) {
Priority: 10,
ScriptType: "javascript",
Script: "1==1",
AffinityAware: AffinityAware{
Key: "region",
Ratio: 20,
},
Force: func() *bool {
b := true
return &b
Expand Down Expand Up @@ -872,6 +876,7 @@ func TestRouterConfigClone(t *testing.T) {
assert.Equal(t, router.Priority, cloned.Priority)
assert.Equal(t, router.ScriptType, cloned.ScriptType)
assert.Equal(t, router.Script, cloned.Script)
assert.Equal(t, router.AffinityAware, cloned.AffinityAware)
assert.NotSame(t, router, cloned)
assert.Len(t, cloned.Conditions, 2)
assert.Len(t, cloned.Tags, 2)
Expand Down
4 changes: 4 additions & 0 deletions global/router_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type RouterConfig struct {
Tags []Tag `yaml:"tags" json:"tags,omitempty" property:"tags"`
ScriptType string `yaml:"type" json:"type,omitempty" property:"type"`
Script string `yaml:"script" json:"script,omitempty" property:"script"`

AffinityAware AffinityAware `yaml:"affinityAware" json:"affinityAware,omitempty" property:"affinityAware"`
}

type Tag struct {
Expand Down Expand Up @@ -178,5 +180,7 @@ func (c *RouterConfig) Clone() *RouterConfig {
Tags: newTags,
ScriptType: c.ScriptType,
Script: c.Script,

AffinityAware: c.AffinityAware,
}
}
Loading