diff --git a/pkg/csplugin/broker.go b/pkg/csplugin/broker.go index ab5a7a7bedc..7b44a50d184 100644 --- a/pkg/csplugin/broker.go +++ b/pkg/csplugin/broker.go @@ -51,6 +51,7 @@ type PluginBroker struct { pluginProcConfig *csconfig.PluginCfg pluginsTypesToDispatch map[string]struct{} newBackoff backoffFactory + deliveryWg sync.WaitGroup } // holder to determine where to dispatch config and how to format messages @@ -146,7 +147,9 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) pluginMutex.Unlock() + pb.deliveryWg.Add(1) go func() { + defer pb.deliveryWg.Done() // Chunk alerts to respect group_threshold threshold := pb.pluginConfigByName[pluginName].GroupThreshold if threshold == 0 { @@ -162,11 +165,45 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { case <-pluginTomb.Dying(): log.Infof("pluginTomb dying") + + // Step 1: Drain PluginChannel — alerts not yet picked up by the + // normal select case. Deliver them directly. + drainLoop: + for { + select { + case profileAlert := <-pb.PluginChannel: + pb.addProfileAlert(profileAlert) + default: + break drainLoop + } + } + + // Step 2: Flush alerts already queued in alertsByPluginName by + // addProfileAlert. The watcher ticker (e.g. 30s group_wait) hasn't + // fired yet, and the inner loop races Dead() vs PluginEvents — + // Dead() can win, losing the alert. Deliver directly. + pluginMutex.Lock() + alertsCopy := make(map[string][]*models.Alert) + for k, v := range pb.alertsByPluginName { + if len(v) > 0 { + alertsCopy[k] = v + pb.alertsByPluginName[k] = nil + } + } + pluginMutex.Unlock() + for pluginName, alerts := range alertsCopy { + if err := pb.pushNotificationsToPlugin(ctx, pluginName, alerts); err != nil { + log.WithField("plugin:", pluginName).Error(err) + } + } + pb.watcher.tomb.Kill(errors.New("Terminating")) for { select { case <-pb.watcher.tomb.Dead(): + log.Info("waiting for in-flight deliveries to complete") + pb.deliveryWg.Wait() log.Info("killing all plugins") pb.Kill() @@ -179,9 +216,13 @@ func (pb *PluginBroker) Run(pluginTomb *tomb.Tomb) { pb.alertsByPluginName[pluginName] = make([]*models.Alert, 0) pluginMutex.Unlock() - if err := pb.pushNotificationsToPlugin(ctx, pluginName, tmpAlerts); err != nil { - log.WithField("plugin:", pluginName).Error(err) - } + pb.deliveryWg.Add(1) + go func() { + defer pb.deliveryWg.Done() + if err := pb.pushNotificationsToPlugin(ctx, pluginName, tmpAlerts); err != nil { + log.WithField("plugin:", pluginName).Error(err) + } + }() } } }