From 615ef9d0b77074d1812d7a16359925407d910695 Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Mon, 14 Apr 2025 16:34:38 +0200 Subject: [PATCH 1/4] Add failing unit test --- pkg/servers/sflow/aggregator.go | 4 +- pkg/servers/sflow/aggregator_test.go | 55 ++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 pkg/servers/sflow/aggregator_test.go diff --git a/pkg/servers/sflow/aggregator.go b/pkg/servers/sflow/aggregator.go index d7c86f7..6009d55 100644 --- a/pkg/servers/sflow/aggregator.go +++ b/pkg/servers/sflow/aggregator.go @@ -17,6 +17,7 @@ type aggregator struct { ingress chan *flow.Flow output chan []*flow.Flow currentUnixTimeSeconds int64 + timeNow func() time.Time } func newAggregator(output chan []*flow.Flow) *aggregator { @@ -25,6 +26,7 @@ func newAggregator(output chan []*flow.Flow) *aggregator { stopCh: make(chan struct{}), ingress: make(chan *flow.Flow), output: output, + timeNow: time.Now, } go a.service() @@ -75,7 +77,7 @@ func (a *aggregator) service() { } func (a *aggregator) ingest(fl *flow.Flow) { - currentUnixTimeSeconds := time.Now().Unix() + currentUnixTimeSeconds := a.timeNow().Unix() currentUnixTimeSeconds -= currentUnixTimeSeconds % aggregationWindowSeconds if a.currentUnixTimeSeconds < currentUnixTimeSeconds { a.flush() diff --git a/pkg/servers/sflow/aggregator_test.go b/pkg/servers/sflow/aggregator_test.go new file mode 100644 index 0000000..b1640a8 --- /dev/null +++ b/pkg/servers/sflow/aggregator_test.go @@ -0,0 +1,55 @@ +package sflow + +import ( + "testing" + "time" + + "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/flowhouse/pkg/models/flow" + "github.com/stretchr/testify/assert" +) + +func exampleFlow(t testing.TB, ts time.Time) *flow.Flow { + return &flow.Flow{ + Agent: must[net.IP](t)(net.IPFromString("2001:db8::1")), + SrcPort: 34567, + DstPort: 443, + Packets: 10, + Protocol: 6, + Family: 4, + Timestamp: ts.Unix(), + Size: 200, + SrcAddr: must[net.IP](t)(net.IPFromString("198.51.100.24")), + DstAddr: must[net.IP](t)(net.IPFromString("203.0.113.30")), + } +} + +func TestAggregatorBuffering(t *testing.T) { + mockedTime := time.Now() + + out := make(chan []*flow.Flow, 10) + agg := newAggregator(out) + agg.timeNow = func() time.Time { return mockedTime } + + agg.ingest(exampleFlow(t, mockedTime)) + assert.Len(t, out, 0) // should not have flushed + + // advance time by 2 seconds + mockedTime = mockedTime.Add(2 * time.Second) + + agg.ingest(exampleFlow(t, mockedTime)) + assert.Len(t, out, 0) // should not have flushed + + // advance time by 10 seconds + agg.ingest(exampleFlow(t, mockedTime)) + assert.Len(t, out, 1) // should have flushed one record +} + +func must[T any](t testing.TB) func(res T, err error) T { + return func(res T, err error) T { + if err != nil { + t.Error(err) + } + return res + } +} From 191adb31dcb8cfa64ca433a33f18bdd39e756143 Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Mon, 14 Apr 2025 16:59:42 +0200 Subject: [PATCH 2/4] Allow empty flush in the beginning --- pkg/servers/sflow/aggregator_test.go | 35 ++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/pkg/servers/sflow/aggregator_test.go b/pkg/servers/sflow/aggregator_test.go index b1640a8..5d02894 100644 --- a/pkg/servers/sflow/aggregator_test.go +++ b/pkg/servers/sflow/aggregator_test.go @@ -7,6 +7,7 @@ import ( "github.com/bio-routing/bio-rd/net" "github.com/bio-routing/flowhouse/pkg/models/flow" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func exampleFlow(t testing.TB, ts time.Time) *flow.Flow { @@ -25,24 +26,48 @@ func exampleFlow(t testing.TB, ts time.Time) *flow.Flow { } func TestAggregatorBuffering(t *testing.T) { - mockedTime := time.Now() + initialTime := time.Now() + mockedTime := initialTime out := make(chan []*flow.Flow, 10) agg := newAggregator(out) agg.timeNow = func() time.Time { return mockedTime } agg.ingest(exampleFlow(t, mockedTime)) - assert.Len(t, out, 0) // should not have flushed + // should have flushed an empty list at the beginning + select { + case flows := <-out: + assert.Empty(t, flows) + default: + t.Error("no flows in channel") + } - // advance time by 2 seconds - mockedTime = mockedTime.Add(2 * time.Second) + // advance time by 5 seconds + mockedTime = mockedTime.Add(5 * time.Second) agg.ingest(exampleFlow(t, mockedTime)) assert.Len(t, out, 0) // should not have flushed // advance time by 10 seconds + mockedTime = mockedTime.Add(10 * time.Second) + agg.ingest(exampleFlow(t, mockedTime)) - assert.Len(t, out, 1) // should have flushed one record + assert.Len(t, out, 1) // should have flushed once + + // check flushed flows + { + select { + case flows := <-out: + require.Len(t, flows, 1) + fl := flows[0] + assert.Equal(t, + initialTime.Add(15*time.Second).Truncate(10*time.Second).Unix(), + fl.Timestamp, + ) + default: + t.Error("no flow available") + } + } } func must[T any](t testing.TB) func(res T, err error) T { From 4b8c2b49c208996341aae950ade627712e2d9b7d Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Mon, 14 Apr 2025 17:18:01 +0200 Subject: [PATCH 3/4] Improve test flakiness --- pkg/servers/sflow/aggregator_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/servers/sflow/aggregator_test.go b/pkg/servers/sflow/aggregator_test.go index 5d02894..c72551f 100644 --- a/pkg/servers/sflow/aggregator_test.go +++ b/pkg/servers/sflow/aggregator_test.go @@ -26,7 +26,8 @@ func exampleFlow(t testing.TB, ts time.Time) *flow.Flow { } func TestAggregatorBuffering(t *testing.T) { - initialTime := time.Now() + // align initial time to avoid test flakiness + initialTime := time.Now().Truncate(10 * time.Second) mockedTime := initialTime out := make(chan []*flow.Flow, 10) @@ -42,8 +43,8 @@ func TestAggregatorBuffering(t *testing.T) { t.Error("no flows in channel") } - // advance time by 5 seconds - mockedTime = mockedTime.Add(5 * time.Second) + // advance time by 2 seconds + mockedTime = mockedTime.Add(2 * time.Second) agg.ingest(exampleFlow(t, mockedTime)) assert.Len(t, out, 0) // should not have flushed @@ -61,7 +62,7 @@ func TestAggregatorBuffering(t *testing.T) { require.Len(t, flows, 1) fl := flows[0] assert.Equal(t, - initialTime.Add(15*time.Second).Truncate(10*time.Second).Unix(), + initialTime.Truncate(10*time.Second).Unix(), fl.Timestamp, ) default: From 65e6b59cfac24d8532a5f8245a1d83fa4570f4ca Mon Sep 17 00:00:00 2001 From: Marcus Weiner Date: Mon, 14 Apr 2025 17:22:49 +0200 Subject: [PATCH 4/4] Refactor sflow flush logic --- pkg/servers/sflow/aggregator.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/servers/sflow/aggregator.go b/pkg/servers/sflow/aggregator.go index 6009d55..0fb07eb 100644 --- a/pkg/servers/sflow/aggregator.go +++ b/pkg/servers/sflow/aggregator.go @@ -8,16 +8,16 @@ import ( ) const ( - aggregationWindowSeconds = 10 + aggregationWindow = 10 * time.Second ) type aggregator struct { - data map[key]*flow.Flow - stopCh chan struct{} - ingress chan *flow.Flow - output chan []*flow.Flow - currentUnixTimeSeconds int64 - timeNow func() time.Time + data map[key]*flow.Flow + stopCh chan struct{} + ingress chan *flow.Flow + output chan []*flow.Flow + lastFlush time.Time + timeNow func() time.Time } func newAggregator(output chan []*flow.Flow) *aggregator { @@ -77,14 +77,15 @@ func (a *aggregator) service() { } func (a *aggregator) ingest(fl *flow.Flow) { - currentUnixTimeSeconds := a.timeNow().Unix() - currentUnixTimeSeconds -= currentUnixTimeSeconds % aggregationWindowSeconds - if a.currentUnixTimeSeconds < currentUnixTimeSeconds { + normalizedIngestTime := a.timeNow().Truncate(aggregationWindow) + + timeSinceLastFlush := normalizedIngestTime.Sub(a.lastFlush) + if timeSinceLastFlush >= aggregationWindow { a.flush() - a.currentUnixTimeSeconds = currentUnixTimeSeconds + a.lastFlush = normalizedIngestTime } - fl.Timestamp = currentUnixTimeSeconds + fl.Timestamp = normalizedIngestTime.Unix() a.add(fl) }