Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5c8115e
cb for ch output plugin
Gwynbleidd0241 Mar 23, 2026
ca693af
cb for ch output plugin
Gwynbleidd0241 Mar 23, 2026
450afc9
Merge branch 'master' into 954-circuit-breaker-ch-plugin
Gwynbleidd0241 Mar 23, 2026
ab289dc
docs generate
Gwynbleidd0241 Mar 23, 2026
df2e46f
fix
Gwynbleidd0241 Mar 23, 2026
56abe5a
fix
Gwynbleidd0241 Apr 2, 2026
7bcf16a
cb
Gwynbleidd0241 Apr 13, 2026
ad16cfc
fix
Gwynbleidd0241 Apr 13, 2026
87d2efb
magic
Gwynbleidd0241 Apr 13, 2026
9303d57
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 Apr 14, 2026
cd432f8
fix
Gwynbleidd0241 Apr 14, 2026
6cdd995
fix
Gwynbleidd0241 Apr 14, 2026
91dffa4
remove cb in ch
Gwynbleidd0241 Apr 27, 2026
c9eeca8
refactor cb for http plugins
Gwynbleidd0241 Apr 27, 2026
7f18480
fix
Gwynbleidd0241 Apr 27, 2026
60396ae
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 6, 2026
341a95a
fix
Gwynbleidd0241 May 6, 2026
6c18f29
fix
Gwynbleidd0241 May 6, 2026
a334814
fix
Gwynbleidd0241 May 8, 2026
506beb5
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 20, 2026
8371f26
fix docs
Gwynbleidd0241 May 20, 2026
b3ee12e
change switch to if
Gwynbleidd0241 May 22, 2026
b43ebce
fix docs
Gwynbleidd0241 May 26, 2026
0cad8f6
fix
Gwynbleidd0241 May 27, 2026
d0fc78a
fix
Gwynbleidd0241 May 28, 2026
9ebe601
fix
Gwynbleidd0241 May 28, 2026
4002f45
without cb when one host
Gwynbleidd0241 May 29, 2026
26290db
Merge branch 'master' into 964-circuit-breaker
Gwynbleidd0241 May 29, 2026
86a0527
update e2e
Gwynbleidd0241 May 29, 2026
b1ae2af
fix
Gwynbleidd0241 May 29, 2026
a91fbc1
fix
Gwynbleidd0241 May 29, 2026
7507ca8
fix
Gwynbleidd0241 May 29, 2026
ccf7aac
fix
Gwynbleidd0241 May 29, 2026
6969042
fix
Gwynbleidd0241 May 29, 2026
e4c0dcd
fix
Gwynbleidd0241 May 29, 2026
90902c8
fix
Gwynbleidd0241 May 29, 2026
fe08704
fix
Gwynbleidd0241 May 29, 2026
bc9a93a
fox
Gwynbleidd0241 May 30, 2026
81a6bc8
fix
Gwynbleidd0241 May 30, 2026
2e54892
fix e2e
Gwynbleidd0241 May 30, 2026
429aa6a
finale fix e2e
Gwynbleidd0241 May 30, 2026
a83c7c9
fix
Gwynbleidd0241 May 30, 2026
63648af
fix
Gwynbleidd0241 Jun 1, 2026
f550a4d
fix
Gwynbleidd0241 Jun 1, 2026
1430118
fix
Gwynbleidd0241 Jun 1, 2026
3af4263
fix
Gwynbleidd0241 Jun 1, 2026
25aa854
fix
Gwynbleidd0241 Jun 1, 2026
0b7cbb6
fix
Gwynbleidd0241 Jun 2, 2026
d861a8f
fix
Gwynbleidd0241 Jun 2, 2026
fa677d0
add metric
Gwynbleidd0241 Jun 2, 2026
13e4569
fix update metric
Gwynbleidd0241 Jun 2, 2026
bf21341
fix
Gwynbleidd0241 Jun 2, 2026
ab7d1a5
update logs
Gwynbleidd0241 Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,18 @@ Process ES response and report errors, if any.

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.
If set to 0, circuit breaker is disabled.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for reconnecting to addresses that are unavailable during initialization.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
33 changes: 27 additions & 6 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,19 @@ type Config struct {
// >
// > Process ES response and report errors, if any.
ProcessResponse bool `json:"process_response" default:"true"` // *

// > @3@4@5@6
// >
// > Period for which addresses will be banned in case of unavailability.
// > If set to 0, circuit breaker is disabled.
BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
BanPeriod_ time.Duration

// > @3@4@5@6
// >
// > Interval for reconnecting to addresses that are unavailable during initialization.
ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
ReconnectInterval_ time.Duration
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -243,8 +256,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
if len(p.config.IndexValues) == 0 {
p.config.IndexValues = append(p.config.IndexValues, "@time")
}
if p.config.ReconnectInterval_ < 1 {
p.logger.Fatal("'reconnect_interval' can't be <1")
}
if p.config.BanPeriod_ < 0 {
p.logger.Fatal("'ban_period' cant't be <0")
}

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.prepareClient()
p.prepareClient(ctx)

p.maintenance(nil)

Expand Down Expand Up @@ -295,9 +317,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.batcher.Start(ctx)
}

Expand All @@ -315,11 +334,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.indexingErrorsMetric = ctl.RegisterCounter("output_elasticsearch_index_error_total", "Number of elasticsearch indexing errors")
}

func (p *Plugin) prepareClient() {
func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: prepareEndpoints(p.config.Endpoints, p.config.IngestPipeline),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
BanPeriod: p.config.BanPeriod_,
ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
Expand All @@ -335,7 +356,7 @@ func (p *Plugin) prepareClient() {
}

var err error
p.client, err = xhttp.NewClient(config)
p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
Expand Down
13 changes: 13 additions & 0 deletions plugin/output/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,18 @@ After a non-retryable write error, fall with a non-zero exit code or not

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.
If set to 0, circuit breaker is disabled.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for reconnecting to addresses that are unavailable during initialization.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
34 changes: 28 additions & 6 deletions plugin/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ type Config struct {
// >
// > After a non-retryable write error, fall with a non-zero exit code or not
Strict bool `json:"strict" default:"false"` // *

// > @3@4@5@6
// >
// > Period for which addresses will be banned in case of unavailability.
// > If set to 0, circuit breaker is disabled.
BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
BanPeriod_ time.Duration

// > @3@4@5@6
// >
// > Interval for reconnecting to addresses that are unavailable during initialization.
ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
ReconnectInterval_ time.Duration
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -200,7 +213,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.mu = &sync.Mutex{}

p.prepareClient()
if p.config.ReconnectInterval_ < 1 {
p.logger.Fatal("'reconnect_interval' can't be <1")
}
if p.config.BanPeriod_ < 0 {
p.logger.Fatal("'ban_period' cant't be <0")
}

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.prepareClient(ctx)

p.logger.Info("starting batcher", zap.Duration("timeout", p.config.BatchFlushTimeout_))

Expand Down Expand Up @@ -249,9 +272,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.batcher.Start(ctx)
}

Expand All @@ -268,11 +288,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounterVec("output_http_send_error_total", "Total HTTP send errors", "status_code")
}

func (p *Plugin) prepareClient() {
func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: p.prepareEndpoints(),
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
BanPeriod: p.config.BanPeriod_,
ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
Expand All @@ -288,7 +310,7 @@ func (p *Plugin) prepareClient() {
}

var err error
p.client, err = xhttp.NewClient(config)
p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
Expand Down
13 changes: 13 additions & 0 deletions plugin/output/loki/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,18 @@ Multiplier for exponential increase of retention between retries

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.
If set to 0, circuit breaker is disabled.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for reconnecting to addresses that are unavailable during initialization.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
36 changes: 29 additions & 7 deletions plugin/output/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ type Config struct {
// >
// > Multiplier for exponential increase of retention between retries
RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *

// > @3@4@5@6
// >
// > Period for which addresses will be banned in case of unavailability.
// > If set to 0, circuit breaker is disabled.
BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
BanPeriod_ time.Duration

// > @3@4@5@6
// >
// > Interval for reconnecting to addresses that are unavailable during initialization.
ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
ReconnectInterval_ time.Duration
}

type AuthStrategy byte
Expand Down Expand Up @@ -259,7 +272,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP

p.labels = p.parseLabels()

p.prepareClient()
if p.config.ReconnectInterval_ < 1 {
p.logger.Fatal("'reconnect_interval' can't be <1")
}
if p.config.BanPeriod_ < 0 {
p.logger.Fatal("'ban_period' cant't be <0")
}

ctx, cancel := context.WithCancel(context.Background())
p.ctx = ctx
p.cancel = cancel

p.prepareClient(ctx)

batcherOpts := &pipeline.BatcherOptions{
PipelineName: params.PipelineName,
Expand Down Expand Up @@ -303,10 +327,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)

ctx, cancel := context.WithCancel(context.Background())
p.ctx = ctx
p.cancel = cancel

p.batcher.Start(ctx)
}

Expand Down Expand Up @@ -430,20 +450,22 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.sendErrorMetric = ctl.RegisterCounterVec("output_loki_send_error_total", "Total Loki send errors", "status_code")
}

func (p *Plugin) prepareClient() {
func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: []string{fmt.Sprintf("%s/loki/api/v1/push", p.config.Address)},
ConnectionTimeout: p.config.ConnectionTimeout_ * 2,
AuthHeader: p.getAuthHeader(),
CustomHeaders: p.getCustomHeaders(),
BanPeriod: p.config.BanPeriod_,
ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
},
}

var err error
p.client, err = xhttp.NewClient(config)
p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
Expand Down
13 changes: 13 additions & 0 deletions plugin/output/splunk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,18 @@ or the "event" key with any of its subkeys.

<br>

**`ban_period`** *`cfg.Duration`* *`default=10s`*

Period for which addresses will be banned in case of unavailability.
If set to 0, circuit breaker is disabled.

<br>

**`reconnect_interval`** *`cfg.Duration`* *`default=5s`*

Interval for reconnecting to addresses that are unavailable during initialization.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
35 changes: 29 additions & 6 deletions plugin/output/splunk/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ type Config struct {
// > Supports copying whole original event, but does not allow to copy directly to the output root
// > or the "event" key with any of its subkeys.
CopyFields []CopyField `json:"copy_fields" slice:"true"` // *

// > @3@4@5@6
// >
// > Period for which addresses will be banned in case of unavailability.
// > If set to 0, circuit breaker is disabled.
BanPeriod cfg.Duration `json:"ban_period" default:"10s" parse:"duration"` // *
BanPeriod_ time.Duration

// > @3@4@5@6
// >
// > Interval for reconnecting to addresses that are unavailable during initialization.
ReconnectInterval cfg.Duration `json:"reconnect_interval" default:"5s" parse:"duration"` // *
ReconnectInterval_ time.Duration
}

type KeepAliveConfig struct {
Expand Down Expand Up @@ -235,7 +248,18 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.avgEventSize = params.PipelineSettings.AvgEventSize
p.config = config.(*Config)
p.registerMetrics(params.MetricCtl)
p.prepareClient()

if p.config.ReconnectInterval_ < 1 {
p.logger.Fatal("'reconnect_interval' can't be <1")
}
if p.config.BanPeriod_ < 0 {
p.logger.Fatal("'ban_period' cant't be <0")
}

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.prepareClient(ctx)

for _, cf := range p.config.CopyFields {
if cf.To == "" {
Expand Down Expand Up @@ -296,9 +320,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
onError,
)

ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

p.batcher.Start(ctx)
}

Expand All @@ -319,11 +340,13 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
)
}

func (p *Plugin) prepareClient() {
func (p *Plugin) prepareClient(ctx context.Context) {
config := &xhttp.ClientConfig{
Endpoints: []string{p.config.Endpoint},
ConnectionTimeout: p.config.RequestTimeout_,
AuthHeader: "Splunk " + p.config.Token,
BanPeriod: p.config.BanPeriod_,
ReconnectInterval: p.config.ReconnectInterval_,
KeepAlive: &xhttp.ClientKeepAliveConfig{
MaxConnDuration: p.config.KeepAlive.MaxConnDuration_,
MaxIdleConnDuration: p.config.KeepAlive.MaxIdleConnDuration_,
Expand All @@ -338,7 +361,7 @@ func (p *Plugin) prepareClient() {
}

var err error
p.client, err = xhttp.NewClient(config)
p.client, err = xhttp.NewClient(ctx, config)
if err != nil {
p.logger.Fatal("can't create http client", zap.Error(err))
}
Expand Down
Loading
Loading