Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions pkg/csplugin/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PluginBroker struct {
pluginProcConfig *csconfig.PluginCfg
pluginsTypesToDispatch map[string]struct{}
newBackoff backoffFactory
deliveryWg sync.WaitGroup
}

// holder to determine where to dispatch config and how to format messages
Expand Down Expand Up @@ -146,7 +147,9 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {
pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
pluginMutex.Unlock()

pb.deliveryWg.Add(1)
go func() {
defer pb.deliveryWg.Done()
// Chunk alerts to respect group_threshold
threshold := pb.pluginConfigByName[pluginName].GroupThreshold
if threshold == 0 {
Expand All @@ -162,11 +165,45 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {

case <-pluginTomb.Dying():
log.Infof("pluginTomb dying")

// Step 1: Drain PluginChannel — alerts not yet picked up by the
// normal select case. Deliver them directly.
drainLoop:
for {
select {
case profileAlert := <-pb.PluginChannel:
pb.addProfileAlert(profileAlert)
default:
break drainLoop
}
}

// Step 2: Flush alerts already queued in alertsByPluginName by
// addProfileAlert. The watcher ticker (e.g. 30s group_wait) hasn't
// fired yet, and the inner loop races Dead() vs PluginEvents —
// Dead() can win, losing the alert. Deliver directly.
pluginMutex.Lock()
alertsCopy := make(map[string][]*models.Alert)
for k, v := range pb.alertsByPluginName {
if len(v) > 0 {
alertsCopy[k] = v
pb.alertsByPluginName[k] = nil
}
}
pluginMutex.Unlock()
for pluginName, alerts := range alertsCopy {
if err := pb.pushNotificationsToPlugin(ctx, pluginName, alerts); err != nil {
log.WithField("plugin:", pluginName).Error(err)
}
}

pb.watcher.tomb.Kill(errors.New("Terminating"))

for {
select {
case <-pb.watcher.tomb.Dead():
log.Info("waiting for in-flight deliveries to complete")
pb.deliveryWg.Wait()
log.Info("killing all plugins")
pb.Kill()

Expand All @@ -179,9 +216,13 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) {
pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0)
pluginMutex.Unlock()

if err := pb.pushNotificationsToPlugin(ctx, pluginName, tmpAlerts); err != nil {
log.WithField("plugin:", pluginName).Error(err)
}
pb.deliveryWg.Add(1)
go func() {
defer pb.deliveryWg.Done()
if err := pb.pushNotificationsToPlugin(ctx, pluginName, tmpAlerts); err != nil {
log.WithField("plugin:", pluginName).Error(err)
}
}()
}
}
}
Expand Down
Loading