Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions agent/agents/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (C) 2023 Percona LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agents

import (
"crypto/rand"
"math/big"
"sync"
"time"
)

var minuteSchedule = newOffsetSchedule()

type offsetSchedule struct {
m sync.Mutex
offset map[string]int
counts map[int]int
}

func newOffsetSchedule() *offsetSchedule {
return &offsetSchedule{
offset: make(map[string]int),
counts: make(map[int]int),
}
}

// RandomMinuteOffset assigns a random delay within a minute.
// Offsets are unique while there are no more than 60 active agents.
func RandomMinuteOffset(agentID string) (time.Duration, func()) {
return minuteSchedule.assign(agentID, time.Minute)
}

// DelayUntilOffset returns the time left before the offset slot in the current interval.
func DelayUntilOffset(now time.Time, interval, offset time.Duration) time.Duration {
if interval <= 0 || offset <= 0 {
return 0
}

offset %= interval
periodStart := now.Truncate(interval)
target := periodStart.Add(offset)
if !target.After(now) {
return 0
}

return target.Sub(now)
}

func (s *offsetSchedule) assign(agentID string, interval time.Duration) (time.Duration, func()) {
s.m.Lock()
defer s.m.Unlock()

if offset, ok := s.offset[agentID]; ok {
return time.Duration(offset) * time.Second, func() {}
}

slots := int(interval / time.Second)
if slots <= 0 {
return 0, func() {}
}

offset := s.nextOffset(slots)
s.offset[agentID] = offset
s.counts[offset]++

var once sync.Once
release := func() {
once.Do(func() {
s.release(agentID)
})
}

return time.Duration(offset) * time.Second, release
}

func (s *offsetSchedule) nextOffset(slots int) int {
minCount := int(^uint(0) >> 1)
candidates := make([]int, 0, slots)
for i := range slots {
count := s.counts[i]
switch {
case count < minCount:
minCount = count
candidates = candidates[:0]
candidates = append(candidates, i)
case count == minCount:
candidates = append(candidates, i)
}
}

return candidates[randomInt(len(candidates))]
}

func (s *offsetSchedule) release(agentID string) {
s.m.Lock()
defer s.m.Unlock()

offset, ok := s.offset[agentID]
if !ok {
return
}

delete(s.offset, agentID)
s.counts[offset]--
if s.counts[offset] <= 0 {
delete(s.counts, offset)
}
}

func randomInt(n int) int {
v, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
if err != nil {
return int(time.Now().UnixNano() % int64(n))
}

return int(v.Int64())
}
152 changes: 152 additions & 0 deletions agent/agents/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (C) 2023 Percona LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package agents

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOffsetSchedule(t *testing.T) {
t.Run("assigns unique offsets while slots are available", func(t *testing.T) {
s := newOffsetSchedule()
seen := make(map[time.Duration]bool)

for i := range 60 {
offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute)
defer release()

require.False(t, seen[offset])
seen[offset] = true
}
})

t.Run("reuses offsets only after all slots are occupied", func(t *testing.T) {
s := newOffsetSchedule()
counts := make(map[time.Duration]int)

for i := range 61 {
offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute)
defer release()

counts[offset]++
}

assert.Len(t, counts, 60)
for _, count := range counts {
assert.LessOrEqual(t, count, 2)
}
})

t.Run("distributes offsets evenly", func(t *testing.T) {
for _, agentsCount := range []int{60, 120, 180, 181} {
s := newOffsetSchedule()
counts := make(map[time.Duration]int)

for i := range agentsCount {
offset, release := s.assign(fmt.Sprintf("agent-%d", i), time.Minute)
defer release()

counts[offset]++
}

require.Len(t, counts, 60)
minCount := agentsCount
maxCount := 0
for _, count := range counts {
minCount = min(minCount, count)
maxCount = max(maxCount, count)
}
assert.LessOrEqual(t, maxCount-minCount, 1)
}
})

t.Run("releases offsets", func(t *testing.T) {
s := newOffsetSchedule()
offset, release := s.assign("agent-1", time.Minute)

release()
assert.Empty(t, s.offset)
assert.Empty(t, s.counts)

newOffset, newRelease := s.assign("agent-2", time.Second)
defer newRelease()

assert.Equal(t, offset%time.Second, newOffset)
})
}

func TestDelayUntilOffset(t *testing.T) {
t.Parallel()

now := time.Date(2026, 6, 4, 18, 47, 10, 0, time.UTC)

tests := []struct {
name string
now time.Time
interval time.Duration
offset time.Duration
expected time.Duration
}{
{
name: "before offset",
now: now,
interval: time.Minute,
offset: 30 * time.Second,
expected: 20 * time.Second,
},
{
name: "at offset",
now: now.Add(20 * time.Second),
interval: time.Minute,
offset: 30 * time.Second,
expected: 0,
},
{
name: "after offset",
now: now.Add(21 * time.Second),
interval: time.Minute,
offset: 30 * time.Second,
expected: 0,
},
{
name: "offset outside interval",
now: now,
interval: time.Minute,
offset: 90 * time.Second,
expected: 20 * time.Second,
},
{
name: "non positive interval",
now: now,
interval: 0,
offset: 30 * time.Second,
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

actual := DelayUntilOffset(tt.now, tt.interval, tt.offset)
assert.Equal(t, tt.expected, actual)
})
}
}
69 changes: 66 additions & 3 deletions agent/agents/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,18 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState
go pprof.Do(ctx, pprof.Labels("agentID", agentID, "type", agentType), agent.Run)

go func() {
qanDeliveryRequests := make(chan *agentv1.QANCollectRequest, qanRequestsBufferSize)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qanRequestsBufferSize == only 100 items. It may become full very quickly if we postpone sending QAN requests to network on really loaded systems. And it results into the following:

  1. this go-routine will stop reading from agent.Changes()
  2. but this agent.Changes() is used for delivering agent's status changes as well.
  3. agent's status delivery will be postponed till QAN requests are processed

It looks like there is a need in moving agent's status change channel into a separate high priority channel that is not blocked by the rest channels.

qanDeliveryDone := make(chan struct{})
go func() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this func startBuiltin is common for QAN and RTA agents. This particular go-routine is created unconditionally for all agent types 9even when it is not required (RTA case) .

one more point - waiting for each such nested go-routine finish is performed via done channel pattern, isn't it better to have one workGroup instead channel per each go-routine?

defer close(qanDeliveryDone)
s.deliverQANRequests(ctx, l, agentID, qanDeliveryRequests)
}()
defer close(done)
defer func() {
close(qanDeliveryRequests)
<-qanDeliveryDone
}()

rtaBucketLastCollectTime := timestamppb.New(time.Now()).AsTime()

for change := range agent.Changes() {
Expand All @@ -700,10 +712,14 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState
}
}
if change.MetricsBucket != nil {
l.Infof("Sending %d metrics buckets.", len(change.MetricsBucket))
s.qanRequests <- &agentv1.QANCollectRequest{
request := &agentv1.QANCollectRequest{
MetricsBucket: change.MetricsBucket,
}
select {
case qanDeliveryRequests <- request:
case <-ctx.Done():
return
}
}

if len(change.RTAQueriesBucket) != 0 {
Expand All @@ -726,7 +742,6 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState
}
}
}
close(done)
}()

//nolint:forcetypeassert
Expand All @@ -741,6 +756,54 @@ func (s *Supervisor) startBuiltin(agentID string, builtinAgent *agentv1.SetState
return nil
}

func (s *Supervisor) deliverQANRequests(ctx context.Context, l *logrus.Entry, agentID string, requests <-chan *agentv1.QANCollectRequest) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an alternative via rateLimiter:

package main

import (
	"context"
	"log"
	"time"

	"golang.org/x/time/rate"
)

// Allow exactly 10 requests per second, with a maximum burst of 1.
// This enforces a strict, even flow to the backend.
var backendLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)

func HandleRequest(ctx context.Context, payload string) error {
	// 1. We wait for permission to proceed. 
	// If the rate is exceeded, this Goroutine goes to sleep (is parked)
	// and wakes up exactly when it's allowed to hit the backend.
	if err := backendLimiter.Wait(ctx); err != nil {
		// This error ONLY triggers if the context expires or is canceled 
		// before a token becomes available.
		return err 
	}

	// 2. We are cleared to go. Hit the backing service.
	return callBackingService(payload)
}

func callBackingService(payload string) error {
	// Simulate backend work
	return nil
}

you may define a time distribution factor in ..rate.Every(100*time.Millisecond).... It looks less complex

var qanDeliveryOffset time.Duration
qanDeliveryOffsetAssigned := false
var releaseQANDeliveryOffset func()

for request := range requests {
if !qanDeliveryOffsetAssigned {
var offset time.Duration
offset, releaseQANDeliveryOffset = agents.RandomMinuteOffset(agentID)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with this approach - it is applied even when there is only 1 QAN agent because this logic is placed on Agent side that knows nothing about number of QAN agents registered on PMM server.

It still may create local spikes (yes, they will be not so critical in comparing to heavy-loaded systems) because it only shifts sending the whole collected/buffered QAN requests collection from 0-s second of each minute to N-s second of each minute.

qanDeliveryOffset = offset
qanDeliveryOffsetAssigned = true
}

delay := agents.DelayUntilOffset(time.Now(), time.Minute, qanDeliveryOffset)
s.sendQANRequest(ctx, l, request, delay)
}

if releaseQANDeliveryOffset != nil {
releaseQANDeliveryOffset()
}
}

func (s *Supervisor) sendQANRequest(ctx context.Context, l *logrus.Entry, request *agentv1.QANCollectRequest, delay time.Duration) bool {
if delay > 0 {
l.Debugf("Scheduling QAN delivery in %s.", delay)
t := time.NewTimer(delay)
select {
case <-t.C:
case <-ctx.Done():
if !t.Stop() {
select {
case <-t.C:
default:
}
}
return false
}
}

l.Infof("Sending %d metrics buckets.", len(request.MetricsBucket))
select {
case s.qanRequests <- request:
return true
case <-ctx.Done():
return false
}
}

// agentLogger write logs to Store so can get last N.
func (s *Supervisor) agentLogger(logStore *tailog.Store) *logrus.Logger {
return &logrus.Logger{
Expand Down
Loading
Loading