diff --git a/fd/file.d.go b/fd/file.d.go index e071c1acf..f11e07d98 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -63,7 +63,7 @@ func (f *FileD) Start() { } func (f *FileD) initMetrics() { - f.metricCtl = metric.NewCtl("file_d", f.registry, 0, 0) + f.metricCtl = metric.NewCtl("file_d", f.registry, 0, 0, 0) f.versionMetric = f.metricCtl.RegisterGaugeVec("version", "", "version") f.versionMetric.WithLabelValues(buildinfo.Version).Inc() } diff --git a/fd/util.go b/fd/util.go index 9aa5af788..ba934045c 100644 --- a/fd/util.go +++ b/fd/util.go @@ -36,6 +36,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { var antispamRules antispam.Rules metricHoldDuration := pipeline.DefaultMetricHoldDuration + metricFlushInterval := pipeline.DefaultMetricFlushInterval metricMaxLabelValueLength := pipeline.DefaultMetricMaxLabelValueLength if settings != nil { @@ -146,6 +147,15 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { metricHoldDuration = i } + str = metrics.Get("flush_interval").MustString() + if str != "" { + i, err := time.ParseDuration(str) + if err != nil { + logger.Fatalf("can't parse pipeline metric hold duration: %s", err.Error()) + } + metricFlushInterval = i + } + metricMaxLabelValueLength = metrics.Get("max_label_value_length").MustInt() if metricMaxLabelValueLength < 0 { logger.Warn("negative max_label_value_length value, metric label truncation is disabled") @@ -176,6 +186,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { Pool: pipeline.PoolType(pool), Metric: &pipeline.MetricSettings{ HoldDuration: metricHoldDuration, + FlushInterval: metricFlushInterval, MaxLabelValueLength: metricMaxLabelValueLength, }, } diff --git a/metric/buffer.go b/metric/buffer.go new file mode 100644 index 000000000..c339409a0 --- /dev/null +++ b/metric/buffer.go @@ -0,0 +1,87 @@ +package metric + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type counterBuffer struct { + mu sync.Mutex + delta float64 +} + +func (b *counterBuffer) add(v float64) { + b.mu.Lock() + b.delta += v + b.mu.Unlock() +} + +func (b *counterBuffer) flush(c prometheus.Counter) { + b.mu.Lock() + d := b.delta + b.delta = 0 + b.mu.Unlock() + if d > 0 { + c.Add(d) + } +} + +type gaugeBuffer struct { + mu sync.Mutex + hasSet bool + setValue float64 + delta float64 +} + +func (b *gaugeBuffer) set(v float64) { + b.mu.Lock() + b.hasSet, b.setValue, b.delta = true, v, 0 + b.mu.Unlock() +} + +func (b *gaugeBuffer) add(v float64) { + b.mu.Lock() + b.delta += v + b.mu.Unlock() +} + +func (b *gaugeBuffer) flush(g prometheus.Gauge) { + b.mu.Lock() + hasSet, setValue, delta := b.hasSet, b.setValue, b.delta + b.hasSet, b.setValue, b.delta = false, 0, 0 + b.mu.Unlock() + + switch { + case hasSet: + g.Set(setValue + delta) + case delta != 0: + g.Add(delta) + } +} + +type histogramBuffer struct { + mu sync.Mutex + pending []float64 +} + +func (b *histogramBuffer) observe(v float64) { + b.mu.Lock() + b.pending = append(b.pending, v) + b.mu.Unlock() +} + +func (b *histogramBuffer) flush(h prometheus.Histogram) { + b.mu.Lock() + if len(b.pending) == 0 { + b.mu.Unlock() + return + } + obs := b.pending + b.pending = make([]float64, 0, len(obs)) + b.mu.Unlock() + + for _, v := range obs { + h.Observe(v) + } +} diff --git a/metric/controller.go b/metric/controller.go index 2d212fcc1..f84ec3433 100644 --- a/metric/controller.go +++ b/metric/controller.go @@ -27,35 +27,53 @@ type Ctl struct { mu sync.RWMutex } -func NewCtl(subsystem string, registry *prometheus.Registry, metricHoldDuration time.Duration, metricMaxLabelValueLength int) *Ctl { +func NewCtl( + subsystem string, + registry *prometheus.Registry, + metricHoldDuration time.Duration, + metricFlushInterval time.Duration, + metricMaxLabelValueLength int, +) *Ctl { ctl := &Ctl{ subsystem: subsystem, register: registry, metrics: make(map[string]prometheus.Collector), metricMaxLabelValueLength: metricMaxLabelValueLength, } - if metricHoldDuration != 0 { - ctl.holder = NewHolder(metricHoldDuration) + ctl.holder = NewHolder(metricHoldDuration, metricFlushInterval) } - return ctl } -func (mc *Ctl) Maintenance() { - if mc.holder == nil { - return +func (mc *Ctl) Start() { + if mc.holder != nil { + mc.holder.start() } +} - mc.holder.maintenance() +func (mc *Ctl) Stop() { + if mc.holder != nil { + mc.holder.stop() + } } -func (mc *Ctl) AddToHolder(mv heldMetricVec) { - if mc.holder == nil { - return +func (mc *Ctl) Flush() { + if mc.holder != nil { + mc.holder.flush() + } +} + +func (mc *Ctl) Maintenance() { + if mc.holder != nil { + mc.holder.maintenance() } +} - mc.holder.addMetricVec(mv) +func (mc *Ctl) AddToHolder(mv heldMetricVec) { + if mc.holder != nil { + mc.holder.addMetricVec(mv) + } } func (mc *Ctl) RegisterCounter(name, help string) *Counter { @@ -142,6 +160,5 @@ func (mc *Ctl) registerMetric(name string, newMetric prometheus.Collector) prome mc.metrics[name] = metric mc.register.MustRegister(metric) } - return metric } diff --git a/metric/counter.go b/metric/counter.go index e7a3ebb51..58dbf3131 100644 --- a/metric/counter.go +++ b/metric/counter.go @@ -18,12 +18,20 @@ func newCounter(c prometheus.Counter) *Counter { } func (c *Counter) Inc() { - c.metric.Inc() + if buf := c.buffer.Load(); buf != nil { + buf.(*counterBuffer).add(1) + } else { + c.metric.Inc() + } c.updateUsage() } func (c *Counter) Add(v float64) { - c.metric.Add(v) + if buf := c.buffer.Load(); buf != nil { + buf.(*counterBuffer).add(v) + } else { + c.metric.Add(v) + } c.updateUsage() } @@ -44,6 +52,12 @@ func newCounterVec(cv *prometheus.CounterVec, maxLabelValueLength int) *CounterV } } +func (cv *CounterVec) enableBuffering() { + cv.store.enableBuffering(func() any { + return &counterBuffer{} + }) +} + func (cv *CounterVec) WithLabelValues(lvs ...string) *Counter { return &Counter{ heldMetric: cv.store.GetOrCreate(lvs, cv.vec.WithLabelValues), @@ -57,3 +71,9 @@ func (cv *CounterVec) DeleteLabelValues(lvs ...string) bool { func (cv *CounterVec) DeleteOldMetrics(holdDuration time.Duration) { cv.store.DeleteOldMetrics(holdDuration, cv.vec) } + +func (cv *CounterVec) flush() { + cv.store.flushAll(func(hm *heldMetric[prometheus.Counter]) { + hm.buffer.Load().(*counterBuffer).flush(hm.metric) + }) +} diff --git a/metric/gauge.go b/metric/gauge.go index 2d3a55161..e4dbd8760 100644 --- a/metric/gauge.go +++ b/metric/gauge.go @@ -11,40 +11,60 @@ type Gauge struct { *heldMetric[prometheus.Gauge] } -func newGauge(c prometheus.Gauge) *Gauge { +func newGauge(g prometheus.Gauge) *Gauge { return &Gauge{ - heldMetric: newHeldMetric(nil, c), + heldMetric: newHeldMetric(nil, g), } } func (g *Gauge) Set(v float64) { - g.metric.Set(v) + if buf := g.buffer.Load(); buf != nil { + buf.(*gaugeBuffer).set(v) + } else { + g.metric.Set(v) + } g.updateUsage() } func (g *Gauge) Inc() { - g.metric.Inc() + if buf := g.buffer.Load(); buf != nil { + buf.(*gaugeBuffer).add(1) + } else { + g.metric.Inc() + } g.updateUsage() } func (g *Gauge) Dec() { - g.metric.Dec() + if buf := g.buffer.Load(); buf != nil { + buf.(*gaugeBuffer).add(-1) + } else { + g.metric.Dec() + } g.updateUsage() } func (g *Gauge) Add(v float64) { - g.metric.Add(v) + if buf := g.buffer.Load(); buf != nil { + buf.(*gaugeBuffer).add(v) + } else { + g.metric.Add(v) + } g.updateUsage() } func (g *Gauge) Sub(v float64) { - g.metric.Sub(v) + if buf := g.buffer.Load(); buf != nil { + buf.(*gaugeBuffer).add(-v) + } else { + g.metric.Sub(v) + } g.updateUsage() } // should only be used in tests -func (c *Gauge) ToFloat64() float64 { - return testutil.ToFloat64(c.metric) +func (g *Gauge) ToFloat64() float64 { + return testutil.ToFloat64(g.metric) } type GaugeVec struct { @@ -59,16 +79,26 @@ func newGaugeVec(gv *prometheus.GaugeVec, maxLabelValueLength int) *GaugeVec { } } +func (gv *GaugeVec) enableBuffering() { + gv.store.enableBuffering(func() any { return &gaugeBuffer{} }) +} + func (gv *GaugeVec) WithLabelValues(lvs ...string) *Gauge { return &Gauge{ heldMetric: gv.store.GetOrCreate(lvs, gv.vec.WithLabelValues), } } -func (cv *GaugeVec) DeleteLabelValues(lvs ...string) bool { - return cv.store.Delete(lvs, cv.vec) +func (gv *GaugeVec) DeleteLabelValues(lvs ...string) bool { + return gv.store.Delete(lvs, gv.vec) } func (gv *GaugeVec) DeleteOldMetrics(holdDuration time.Duration) { gv.store.DeleteOldMetrics(holdDuration, gv.vec) } + +func (gv *GaugeVec) flush() { + gv.store.flushAll(func(hm *heldMetric[prometheus.Gauge]) { + hm.buffer.Load().(*gaugeBuffer).flush(hm.metric) + }) +} diff --git a/metric/histogram.go b/metric/histogram.go index 5b9312171..2cf42fea0 100644 --- a/metric/histogram.go +++ b/metric/histogram.go @@ -10,14 +10,18 @@ type Histogram struct { *heldMetric[prometheus.Histogram] } -func newHistogram(c prometheus.Histogram) *Histogram { +func newHistogram(h prometheus.Histogram) *Histogram { return &Histogram{ - heldMetric: newHeldMetric(nil, c), + heldMetric: newHeldMetric(nil, h), } } func (h *Histogram) Observe(v float64) { - h.metric.Observe(v) + if buf := h.buffer.Load(); buf != nil { + buf.(*histogramBuffer).observe(v) + } else { + h.metric.Observe(v) + } h.updateUsage() } @@ -33,6 +37,10 @@ func newHistogramVec(hv *prometheus.HistogramVec, maxLabelValueLength int) *Hist } } +func (hv *HistogramVec) enableBuffering() { + hv.store.enableBuffering(func() any { return &histogramBuffer{} }) +} + func (hv *HistogramVec) WithLabelValues(lvs ...string) *Histogram { return &Histogram{ heldMetric: hv.store.GetOrCreate(lvs, func(s ...string) prometheus.Histogram { @@ -41,10 +49,16 @@ func (hv *HistogramVec) WithLabelValues(lvs ...string) *Histogram { } } -func (cv *HistogramVec) DeleteLabelValues(lvs ...string) bool { - return cv.store.Delete(lvs, cv.vec) +func (hv *HistogramVec) DeleteLabelValues(lvs ...string) bool { + return hv.store.Delete(lvs, hv.vec) } func (hv *HistogramVec) DeleteOldMetrics(holdDuration time.Duration) { hv.store.DeleteOldMetrics(holdDuration, hv.vec) } + +func (hv *HistogramVec) flush() { + hv.store.flushAll(func(hm *heldMetric[prometheus.Histogram]) { + hm.buffer.Load().(*histogramBuffer).flush(hm.metric) + }) +} diff --git a/metric/holder.go b/metric/holder.go index 2ffb1277a..6dd97eac1 100644 --- a/metric/holder.go +++ b/metric/holder.go @@ -6,21 +6,62 @@ import ( type heldMetricVec interface { DeleteOldMetrics(holdDuration time.Duration) + enableBuffering() + flush() } type Holder struct { - holdDuration time.Duration - heldMetrics []heldMetricVec + holdDuration time.Duration + flushInterval time.Duration + heldMetrics []heldMetricVec + + stopCh chan struct{} } -// NewHolder returns new metric holder. The holdDuration must be more than 1m. -func NewHolder(holdDuration time.Duration) *Holder { +func NewHolder(holdDuration, flushInterval time.Duration) *Holder { if holdDuration < time.Minute { panic("hold duration must be greater than 1m") } return &Holder{ - holdDuration: holdDuration, - heldMetrics: make([]heldMetricVec, 0), + holdDuration: holdDuration, + flushInterval: flushInterval, + heldMetrics: make([]heldMetricVec, 0), + stopCh: make(chan struct{}), + } +} + +func (h *Holder) start() { + if h.flushInterval == 0 { + return + } + go h.flushLoop() +} + +func (h *Holder) stop() { + close(h.stopCh) + h.flushBuffers() +} + +func (h *Holder) flush() { + h.flushBuffers() +} + +func (h *Holder) flushLoop() { + ticker := time.NewTicker(h.flushInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + h.flushBuffers() + case <-h.stopCh: + return + } + } +} + +func (h *Holder) flushBuffers() { + for _, mv := range h.heldMetrics { + mv.flush() } } @@ -29,10 +70,12 @@ func (h *Holder) maintenance() { } func (h *Holder) addMetricVec(mv heldMetricVec) { + if h.flushInterval > 0 { + mv.enableBuffering() + } h.heldMetrics = append(h.heldMetrics, mv) } -// DeleteOldMetrics delete old metric labels, that aren't in use since last update. func (h *Holder) deleteOldMetrics() { for i := range h.heldMetrics { h.heldMetrics[i].DeleteOldMetrics(h.holdDuration) diff --git a/metric/metric.go b/metric/metric.go index 76c0d05ed..bf477d153 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -16,6 +16,7 @@ type heldMetric[T prometheus.Metric] struct { labels []string lastUsage atomic.Int64 // unixnano timestamp metric T + buffer atomic.Value } func newHeldMetric[T prometheus.Metric](labels []string, metric T) *heldMetric[T] { @@ -44,6 +45,7 @@ type heldMetricsStore[T prometheus.Metric] struct { mu sync.RWMutex metricsByHash map[uint64][]*heldMetric[T] metricMaxLabelValueLength int + newBuffer func() any } func newHeldMetricsStore[T prometheus.Metric](metricMaxLabelValueLength int) *heldMetricsStore[T] { @@ -54,6 +56,19 @@ func newHeldMetricsStore[T prometheus.Metric](metricMaxLabelValueLength int) *he } } +func (h *heldMetricsStore[T]) enableBuffering(newBuffer func() any) { + h.mu.Lock() + defer h.mu.Unlock() + h.newBuffer = newBuffer + for _, metrics := range h.metricsByHash { + for _, hm := range metrics { + if hm.buffer.Load() == nil { + hm.buffer.Store(newBuffer()) + } + } + } +} + func (h *heldMetricsStore[T]) GetOrCreate(labels []string, newPromMetric func(...string) T) *heldMetric[T] { h.truncateLabels(labels) hash := computeStringsHash(labels) @@ -91,9 +106,11 @@ func (h *heldMetricsStore[T]) Delete(labels []string, deleter metricDeleter) boo if len(hMetrics) == 0 { delete(h.metricsByHash, hash) + } else { + h.metricsByHash[hash] = hMetrics } - return ok + return true } func (h *heldMetricsStore[T]) getHeldMetricByHash(labels []string, hash uint64) (*heldMetric[T], bool) { @@ -131,6 +148,9 @@ func (h *heldMetricsStore[T]) tryCreate(labels []string, hash uint64, newPromMet } hMetric = newHeldMetric(labels, metric) + if h.newBuffer != nil { + hMetric.buffer.Store(h.newBuffer()) + } h.metricsByHash[hash] = append(h.metricsByHash[hash], hMetric) return hMetric } @@ -159,6 +179,23 @@ func (h *heldMetricsStore[T]) DeleteOldMetrics(holdDuration time.Duration, delet if len(releasedMetrics) == 0 { delete(h.metricsByHash, hash) + } else { + h.metricsByHash[hash] = releasedMetrics + } + } +} + +func (h *heldMetricsStore[T]) flushAll(flushFn func(*heldMetric[T])) { + if h.newBuffer == nil { + return + } + h.mu.RLock() + defer h.mu.RUnlock() + for _, hMetrics := range h.metricsByHash { + for _, hm := range hMetrics { + if hm.buffer.Load() != nil { + flushFn(hm) + } } } } @@ -176,14 +213,12 @@ func (h *heldMetricsStore[T]) truncateLabels(lvs []string) { } func findHeldMetricIndex[T prometheus.Metric](hMetrics []*heldMetric[T], labels []string) int { - idx := -1 for i := range hMetrics { if slices.Equal(hMetrics[i].labels, labels) { - idx = i - break + return i } } - return idx + return -1 } func computeStringsHash(s []string) uint64 { diff --git a/metric/metric_test.go b/metric/metric_test.go index 818fc8681..6a6722a89 100644 --- a/metric/metric_test.go +++ b/metric/metric_test.go @@ -14,7 +14,7 @@ import ( func TestLabelExpiration(t *testing.T) { r := require.New(t) - ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) c := ctl.RegisterCounterVec("errors", "", "level") now := time.Now().UnixNano() @@ -71,7 +71,7 @@ func TestLabelTruncation(t *testing.T) { maxLabelValueLength := 10 label := "some_long_label_value" - ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, maxLabelValueLength) + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, maxLabelValueLength) c := ctl.RegisterCounterVec("errors", "", "level") c.WithLabelValues(label).Inc() @@ -81,6 +81,42 @@ func TestLabelTruncation(t *testing.T) { r.Equal(float64(0), c.WithLabelValues("some_long_").ToFloat64()) } +func TestCounterVecBuffering(t *testing.T) { + r := require.New(t) + + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, time.Second, 0) + c := ctl.RegisterCounterVec("errors", "", "level") + ctl.AddToHolder(c) + + c.WithLabelValues("error").Inc() + c.WithLabelValues("error").Add(4) + + r.Equal(float64(0), c.WithLabelValues("error").ToFloat64()) + + ctl.Flush() + + r.Equal(float64(5), c.WithLabelValues("error").ToFloat64()) +} + +func TestGaugeVecBuffering(t *testing.T) { + r := require.New(t) + + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, time.Second, 0) + g := ctl.RegisterGaugeVec("queue_size", "", "priority") + ctl.AddToHolder(g) + + g.WithLabelValues("high").Set(10) + g.WithLabelValues("high").Dec() + + g.WithLabelValues("high").Set(10) + g.WithLabelValues("high").Add(5) + g.WithLabelValues("high").Dec() + + r.Equal(float64(0), g.WithLabelValues("high").ToFloat64()) + ctl.Flush() + r.Equal(float64(14), g.WithLabelValues("high").ToFloat64()) +} + var holderBenchCases = []struct { Labels []string LabelValues [][]string @@ -113,7 +149,7 @@ var holderBenchCases = []struct { func BenchmarkMetricHolder(b *testing.B) { for _, benchCase := range holderBenchCases { - ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) counter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...) ctl.AddToHolder(counter) @@ -132,7 +168,7 @@ func BenchmarkMetricHolder(b *testing.B) { func BenchmarkPromVec(b *testing.B) { for _, benchCase := range holderBenchCases { - ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) counter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...) name := strings.Join(benchCase.Labels, "_") diff --git a/pipeline/README.idoc.md b/pipeline/README.idoc.md index 5bf384e30..dfec30b25 100644 --- a/pipeline/README.idoc.md +++ b/pipeline/README.idoc.md @@ -147,6 +147,7 @@ pipelines: settings: metrics: hold_duration: 1h + flush_interval: 30s max_label_value_length: 100 ``` @@ -158,6 +159,12 @@ The amount of time the metric can be idle until it is deleted. Used for deleting
+**`flush_interval`** *`string`* *`default=0s`* + +Defines how often locally buffered values of held metrics are flushed to Prometheus. Buffering reduces the number of direct Prometheus calls by accumulating counter deltas, gauge mutations, and histogram observations locally and applying them in a single batch on each flush. The value must be passed in format of duration (`(ms|s|m|h)`). If zero, all metric writes will go directly to Prometheus (default behaviour). + +
+ **`max_label_value_length`** *`int`* *`default=0`* Maximum length of custom metric labels in action plugins. If zero, no limit is set. diff --git a/pipeline/README.md b/pipeline/README.md index 16072d00d..7e3b7bd0f 100755 --- a/pipeline/README.md +++ b/pipeline/README.md @@ -147,6 +147,7 @@ pipelines: settings: metrics: hold_duration: 1h + flush_interval: 30s max_label_value_length: 100 ``` @@ -158,6 +159,12 @@ The amount of time the metric can be idle until it is deleted. Used for deleting
+**`flush_interval`** *`string`* *`default=0s`* + +Defines how often locally buffered values of held metrics are flushed to Prometheus. Buffering reduces the number of direct Prometheus calls by accumulating counter deltas, gauge mutations, and histogram observations locally and applying them in a single batch on each flush. The value must be passed in format of duration (`(ms|s|m|h)`). If zero, all metric writes will go directly to Prometheus (default behaviour). + +
+ **`max_label_value_length`** *`int`* *`default=0`* Maximum length of custom metric labels in action plugins. If zero, no limit is set. diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go index c0c5552ce..9deac34dc 100644 --- a/pipeline/antispam/antispammer_test.go +++ b/pipeline/antispam/antispammer_test.go @@ -18,7 +18,7 @@ func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Dur Threshold: threshold, UnbanIterations: unbanIterations, Logger: logger.Instance.Named("antispam").Desugar(), - MetricsController: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0), + MetricsController: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0), }) } diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go index f9ff0467a..1d2b69c94 100644 --- a/pipeline/backoff_test.go +++ b/pipeline/backoff_test.go @@ -24,7 +24,7 @@ func TestBackoff(t *testing.T) { batcherBackoff := NewRetriableBatcher( &BatcherOptions{ - MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0, 0), }, func(workerData *WorkerData, batch *Batch) error { eventCount.Inc() @@ -54,7 +54,7 @@ func TestBackoffWithError(t *testing.T) { batcherBackoff := NewRetriableBatcher( &BatcherOptions{ - MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0, 0), }, func(workerData *WorkerData, batch *Batch) error { return errors.New("some error") @@ -84,7 +84,7 @@ func TestBackoffWithErrorWithDeadQueue(t *testing.T) { batcherBackoff := NewRetriableBatcher( &BatcherOptions{ - MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0, 0), }, func(workerData *WorkerData, batch *Batch) error { return errors.New("some error") diff --git a/pipeline/batch_test.go b/pipeline/batch_test.go index 32a5f1e70..e6d9b8208 100644 --- a/pipeline/batch_test.go +++ b/pipeline/batch_test.go @@ -67,7 +67,7 @@ func TestBatcher(t *testing.T) { Workers: 8, BatchSizeCount: batchSize, FlushTimeout: time.Second, - MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0, 0), }) ctx := context.TODO() @@ -138,7 +138,7 @@ func TestBatcherMaxSize(t *testing.T) { Workers: 8, BatchSizeBytes: batchSize, FlushTimeout: time.Minute, - MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0, 0), }) batcher.Start(context.Background()) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 2b6947aed..c2813365d 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -39,8 +39,9 @@ const ( DefaultEventTimeout = time.Second * 30 DefaultFieldValue = "not_set" DefaultStreamName = StreamName("not_set") - DefaultMetricHoldDuration = time.Minute * 30 DefaultMetaCacheSize = 1024 + DefaultMetricHoldDuration = time.Minute * 30 + DefaultMetricFlushInterval = time.Duration(0) DefaultMetricMaxLabelValueLength = 0 EventSeqIDError = uint64(0) @@ -163,6 +164,7 @@ type Settings struct { type MetricSettings struct { HoldDuration time.Duration + FlushInterval time.Duration MaxLabelValueLength int } @@ -182,7 +184,12 @@ const ( // New creates new pipeline. Consider using `SetupHTTPHandlers` next. func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap.Logger) *Pipeline { - metricCtl := metric.NewCtl("pipeline_"+name, registry, settings.Metric.HoldDuration, settings.Metric.MaxLabelValueLength) + metricCtl := metric.NewCtl( + "pipeline_"+name, registry, + settings.Metric.HoldDuration, + settings.Metric.FlushInterval, + settings.Metric.MaxLabelValueLength, + ) var eventPool pool switch settings.Pool { @@ -352,6 +359,8 @@ func (p *Pipeline) Start() { p.streamer.start() + p.metricCtl.Start() + go p.maintenance() go p.antispammerMaintenance() if !p.useSpread { @@ -368,6 +377,8 @@ func (p *Pipeline) Stop() { processor.stop() } + p.metricCtl.Stop() + p.streamer.stop() p.logger.Info("stopping input") diff --git a/plugin/action/throttle/redis_limiter_test.go b/plugin/action/throttle/redis_limiter_test.go index 9fdb9ec5d..bab1064fe 100644 --- a/plugin/action/throttle/redis_limiter_test.go +++ b/plugin/action/throttle/redis_limiter_test.go @@ -317,7 +317,7 @@ func Test_updateKeyLimit(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) lim := newRedisLimiter( &limiterConfig{ ctx: ctx, diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go index 3622e9a5c..b4d427a5d 100644 --- a/plugin/input/file/provider_test.go +++ b/plugin/input/file/provider_test.go @@ -33,7 +33,7 @@ func TestRefreshSymlinkOnBrokenLink(t *testing.T) { panic(err.Error()) } defer os.Remove(linkName) - ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) metrics := newMetricCollection( ctl.RegisterCounter("worker1", "help_test"), ctl.RegisterCounter("worker2", "help_test"), @@ -169,7 +169,7 @@ func TestProviderWatcherPaths(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) config := tt.config test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)}) metrics := newMetricCollection( diff --git a/plugin/input/file/watcher_test.go b/plugin/input/file/watcher_test.go index 1dc000d9f..5fdb220a4 100644 --- a/plugin/input/file/watcher_test.go +++ b/plugin/input/file/watcher_test.go @@ -35,7 +35,7 @@ func TestWatcher(t *testing.T) { notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { shouldCreate.Inc() } - ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) w := NewWatcher( dir, Paths{ @@ -104,7 +104,7 @@ func TestWatcherPaths(t *testing.T) { notifyFn := func(_ notify.Event, _ string, _ os.FileInfo) { shouldCreate.Inc() } - ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0) w := NewWatcher( dir, Paths{ diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index e3defc21a..f5e4e9917 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -112,7 +112,7 @@ func TestWorkerWork(t *testing.T) { shouldSkip: *atomic.NewBool(false), mu: &sync.Mutex{}, } - ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0, 0) metrics := newMetricCollection( ctl.RegisterCounter("worker1", "help_test"), ctl.RegisterCounter("worker2", "help_test"), @@ -293,7 +293,7 @@ func TestWorkerWorkMultiData(t *testing.T) { mu: &sync.Mutex{}, } - ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0, 0) metrics := newMetricCollection( ctl.RegisterCounter("worker1", "help_test"), ctl.RegisterCounter("worker2", "help_test"), @@ -529,7 +529,7 @@ func TestWorkerRemoveAfter(t *testing.T) { if !tt.fileIsChanged { job.eofReadInfo.setOffset(int64(len(str))) } - ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0) + ctl := metric.NewCtl("test", prometheus.NewRegistry(), 0, 0, 0) metrics := newMetricCollection( ctl.RegisterCounter("worker1", "help_test"), ctl.RegisterCounter("worker2", "help_test"), diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go index 805c936f2..14a731d4d 100644 --- a/plugin/output/postgres/postgres_test.go +++ b/plugin/output/postgres/postgres_test.go @@ -90,7 +90,7 @@ func TestPrivateOut(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -160,7 +160,7 @@ func TestPrivateOutWithRetry(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -213,7 +213,7 @@ func TestPrivateOutNoGoodEvents(t *testing.T) { logger: testLogger, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -293,7 +293,7 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) { ctx: ctx, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{ {Root: root}, @@ -359,7 +359,7 @@ func TestPrivateOutWrongTypeInField(t *testing.T) { logger: testLogger, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}}) p.out(nil, batch) @@ -464,7 +464,7 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te ctx: ctx, } - p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + p.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) batch := pipeline.NewPreparedBatch([]*pipeline.Event{ {Root: root}, diff --git a/plugin/output/socket/socket_test.go b/plugin/output/socket/socket_test.go index 6fe73b14a..ac9653c08 100644 --- a/plugin/output/socket/socket_test.go +++ b/plugin/output/socket/socket_test.go @@ -110,7 +110,7 @@ func TestOut_DialError(t *testing.T) { t.Parallel() plugin := newPlugin(t, "tcp", ":9999", '\n') - plugin.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)) + plugin.registerMetrics(metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0)) workerData := pipeline.WorkerData(nil) err := plugin.out(&workerData, newBatch(t, `{"msg":"test"}`)) diff --git a/test/test.go b/test/test.go index e3b05e442..d4d3c2be1 100644 --- a/test/test.go +++ b/test/test.go @@ -240,7 +240,7 @@ func newDefaultParams() pipeline.PluginDefaultParams { return pipeline.PluginDefaultParams{ PipelineName: "test_pipeline", PipelineSettings: &pipeline.Settings{}, - MetricCtl: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0), + MetricCtl: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0, 0), } }