diff --git a/agent/agents/scheduler.go b/agent/agents/scheduler.go new file mode 100644 index 00000000000..f7b95264d10 --- /dev/null +++ b/agent/agents/scheduler.go @@ -0,0 +1,129 @@ +// Copyright (C) 2023 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agents + +import ( + "crypto/rand" + "math/big" + "sync" + "time" +) + +var minuteSchedule = newOffsetSchedule() + +type offsetSchedule struct { + m sync.Mutex + offset map[string]int + counts map[int]int +} + +func newOffsetSchedule() *offsetSchedule { + return &offsetSchedule{ + offset: make(map[string]int), + counts: make(map[int]int), + } +} + +// RandomMinuteOffset assigns a random delay within a minute. +// Offsets are unique while there are no more than 60 active agents. +func RandomMinuteOffset(agentID string) (time.Duration, func()) { + return minuteSchedule.assign(agentID, time.Minute) +} + +// DelayUntilOffset returns the time left before the offset slot in the current interval. +func DelayUntilOffset(now time.Time, interval, offset time.Duration) time.Duration { + if interval <= 0 || offset <= 0 { + return 0 + } + + offset %= interval + periodStart := now.Truncate(interval) + target := periodStart.Add(offset) + if !target.After(now) { + return 0 + } + + return target.Sub(now) +} + +func (s *offsetSchedule) assign(agentID string, interval time.Duration) (time.Duration, func()) { + s.m.Lock() + defer s.m.Unlock() + + if offset, ok := s.offset[agentID]; ok { + return time.Duration(offset) * time.Second, func() {} + } + + slots := int(interval / time.Second) + if slots <= 0 { + return 0, func() {} + } + + offset := s.nextOffset(slots) + s.offset[agentID] = offset + s.counts[offset]++ + + var once sync.Once + release := func() { + once.Do(func() { + s.release(agentID) + }) + } + + return time.Duration(offset) * time.Second, release +} + +func (s *offsetSchedule) nextOffset(slots int) int { + minCount := int(^uint(0) >> 1) + candidates := make([]int, 0, slots) + for i := range slots { + count := s.counts[i] + switch { + case count < minCount: + minCount = count + candidates = candidates[:0] + candidates = append(candidates, i) + case count == minCount: + candidates = append(candidates, i) + } + } + + return candidates[randomInt(len(candidates))] +} + +func (s *offsetSchedule) release(agentID string) { + s.m.Lock() + defer s.m.Unlock() + + offset, ok := s.offset[agentID] + if !ok { + return + } + + delete(s.offset, agentID) + s.counts[offset]-- + if s.counts[offset] <= 0 { + delete(s.counts, offset) + } +} + +func randomInt(n int) int { + v, err := rand.Int(rand.Reader, big.NewInt(int64(n))) + if err != nil { + return int(time.Now().UnixNano() % int64(n)) + } + + return int(v.Int64()) +} diff --git a/agent/agents/scheduler_test.go b/agent/agents/scheduler_test.go new file mode 100644 index 00000000000..31ae1b6a876 --- /dev/null +++ b/agent/agents/scheduler_test.go @@ -0,0 +1,152 @@ +// Copyright (C) 2023 Percona LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package agents + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOffsetSchedule(t *testing.T) { + t.Run("assigns unique offsets while slots are available", func(t *testing.T) { + s := newOffsetSchedule() + seen := make(map[time.Duration]bool) + + for i := range 60 { + offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute) + defer release() + + require.False(t, seen[offset]) + seen[offset] = true + } + }) + + t.Run("reuses offsets only after all slots are occupied", func(t *testing.T) { + s := newOffsetSchedule() + counts := make(map[time.Duration]int) + + for i := range 61 { + offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute) + defer release() + + counts[offset]++ + } + + assert.Len(t, counts, 60) + for _, count := range counts { + assert.LessOrEqual(t, count, 2) + } + }) + + t.Run("distributes offsets evenly", func(t *testing.T) { + for _, agentsCount := range []int{60, 120, 180, 181} { + s := newOffsetSchedule() + counts := make(map[time.Duration]int) + + for i := range agentsCount { + offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute) + defer release() + + counts[offset]++ + } + + require.Len(t, counts, 60) + minCount := agentsCount + maxCount := 0 + for _, count := range counts { + minCount = min(minCount, count) + maxCount = max(maxCount, count) + } + assert.LessOrEqual(t, maxCount-minCount, 1) + } + }) + + t.Run("releases offsets", func(t *testing.T) { + s := newOffsetSchedule() + offset, release := s.assign("agent-1", time.Minute) + + release() + assert.Empty(t, s.offset) + assert.Empty(t, s.counts) + + newOffset, newRelease := s.assign("agent-2", time.Second) + defer newRelease() + + assert.Equal(t, offset%time.Second, newOffset) + }) +} + +func TestDelayUntilOffset(t *testing.T) { + t.Parallel() + + now := time.Date(2026, 6, 4, 18, 47, 10, 0, time.UTC) + + tests := []struct { + name string + now time.Time + interval time.Duration + offset time.Duration + expected time.Duration + }{ + { + name: "before offset", + now: now, + interval: time.Minute, + offset: 30 * time.Second, + expected: 20 * time.Second, + }, + { + name: "at offset", + now: now.Add(20 * time.Second), + interval: time.Minute, + offset: 30 * time.Second, + expected: 0, + }, + { + name: "after offset", + now: now.Add(21 * time.Second), + interval: time.Minute, + offset: 30 * time.Second, + expected: 0, + }, + { + name: "offset outside interval", + now: now, + interval: time.Minute, + offset: 90 * time.Second, + expected: 20 * time.Second, + }, + { + name: "non positive interval", + now: now, + interval: 0, + offset: 30 * time.Second, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual := DelayUntilOffset(tt.now, tt.interval, tt.offset) + assert.Equal(t, tt.expected, actual) + }) + } +} diff --git a/agent/agents/supervisor/supervisor.go b/agent/agents/supervisor/supervisor.go index 139724ad135..7c25e9f71d0 100644 --- a/agent/agents/supervisor/supervisor.go +++ b/agent/agents/supervisor/supervisor.go @@ -688,6 +688,18 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState go pprof.Do(ctx, pprof.Labels("agentID", agentID, "type", agentType), agent.Run) go func() { + qanDeliveryRequests := make(chan *agentv1.QANCollectRequest, qanRequestsBufferSize) + qanDeliveryDone := make(chan struct{}) + go func() { + defer close(qanDeliveryDone) + s.deliverQANRequests(ctx, l, agentID, qanDeliveryRequests) + }() + defer close(done) + defer func() { + close(qanDeliveryRequests) + <-qanDeliveryDone + }() + rtaBucketLastCollectTime := timestamppb.New(time.Now()).AsTime() for change := range agent.Changes() { @@ -700,10 +712,14 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState } } if change.MetricsBucket != nil { - l.Infof("Sending %d metrics buckets.", len(change.MetricsBucket)) - s.qanRequests <- &agentv1.QANCollectRequest{ + request := &agentv1.QANCollectRequest{ MetricsBucket: change.MetricsBucket, } + select { + case qanDeliveryRequests <- request: + case <-ctx.Done(): + return + } } if len(change.RTAQueriesBucket) != 0 { @@ -726,7 +742,6 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState } } } - close(done) }() //nolint:forcetypeassert @@ -741,6 +756,54 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState return nil } +func (s *Supervisor) deliverQANRequests(ctx context.Context, l *logrus.Entry, agentID string, requests <-chan *agentv1.QANCollectRequest) { + var qanDeliveryOffset time.Duration + qanDeliveryOffsetAssigned := false + var releaseQANDeliveryOffset func() + + for request := range requests { + if !qanDeliveryOffsetAssigned { + var offset time.Duration + offset, releaseQANDeliveryOffset = agents.RandomMinuteOffset(agentID) + qanDeliveryOffset = offset + qanDeliveryOffsetAssigned = true + } + + delay := agents.DelayUntilOffset(time.Now(), time.Minute, qanDeliveryOffset) + s.sendQANRequest(ctx, l, request, delay) + } + + if releaseQANDeliveryOffset != nil { + releaseQANDeliveryOffset() + } +} + +func (s *Supervisor) sendQANRequest(ctx context.Context, l *logrus.Entry, request *agentv1.QANCollectRequest, delay time.Duration) bool { + if delay > 0 { + l.Debugf("Scheduling QAN delivery in %s.", delay) + t := time.NewTimer(delay) + select { + case <-t.C: + case <-ctx.Done(): + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return false + } + } + + l.Infof("Sending %d metrics buckets.", len(request.MetricsBucket)) + select { + case s.qanRequests <- request: + return true + case <-ctx.Done(): + return false + } +} + // agentLogger write logs to Store so can get last N. func (s *Supervisor) agentLogger(logStore *tailog.Store) *logrus.Logger { return &logrus.Logger{ diff --git a/agent/agents/supervisor/supervisor_test.go b/agent/agents/supervisor/supervisor_test.go index 3ced9eef431..144b58fda59 100644 --- a/agent/agents/supervisor/supervisor_test.go +++ b/agent/agents/supervisor/supervisor_test.go @@ -19,7 +19,9 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -319,6 +321,52 @@ func TestFilter(t *testing.T) { assert.Equal(t, []string{"toStop"}, toStop) } +func TestSupervisorSendQANRequest(t *testing.T) { + t.Parallel() + + request := &agentv1.QANCollectRequest{ + MetricsBucket: []*agentv1.MetricsBucket{{}}, + } + l := logrus.NewEntry(logrus.New()) + + t.Run("sends without delay", func(t *testing.T) { + t.Parallel() + + s := &Supervisor{ + qanRequests: make(chan *agentv1.QANCollectRequest, 1), + } + + sent := s.sendQANRequest(t.Context(), l, request, 0) + require.True(t, sent) + assert.Same(t, request, <-s.QANRequests()) + }) + + t.Run("cancels pending delivery", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(t.Context()) + s := &Supervisor{ + qanRequests: make(chan *agentv1.QANCollectRequest, 1), + } + + done := make(chan bool, 1) + go func() { + done <- s.sendQANRequest(ctx, l, request, time.Hour) + }() + + cancel() + + select { + case sent := <-done: + assert.False(t, sent) + case <-time.After(time.Second): + t.Fatal("sendQANRequest did not stop after context cancellation") + } + + assert.Empty(t, s.qanRequests) + }) +} + func TestSupervisorProcessParams(t *testing.T) { t.Parallel() setup := func(t *testing.T) (*Supervisor, func()) {