-
Notifications
You must be signed in to change notification settings - Fork 217
PMM-15143 Fix resource leaks in managed #5484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v3
Are you sure you want to change the base?
Changes from 7 commits
57ea696
8f6c3fc
a8599ae
65f9868
3bd1c67
69abcbc
4813634
6086c01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -94,6 +94,11 @@ type Service struct { | |
| startDelay time.Duration | ||
| customCheckFile string // For testing | ||
|
|
||
| runCtxM sync.Mutex | ||
| // runCtx is the service lifecycle context recorded by Run. It bounds | ||
| // asynchronous work started via StartChecks so it is cancelled on shutdown. | ||
| runCtx context.Context //nolint:containedctx | ||
|
|
||
| am sync.Mutex | ||
| advisors []check.Advisor | ||
| checks map[string]check.Check // Checks extracted from advisors and stored by name. | ||
|
|
@@ -134,6 +139,7 @@ func New( | |
| l: l, | ||
| startDelay: defaultStartDelay, | ||
| customCheckFile: os.Getenv(envCheckFile), | ||
| runCtx: context.Background(), | ||
|
|
||
| mChecksExecuted: prom.NewCounterVec(prom.CounterOpts{ | ||
| Namespace: prometheusNamespace, | ||
|
|
@@ -171,20 +177,25 @@ func (s *Service) Run(ctx context.Context) { | |
| s.l.Info("Starting...") | ||
| defer s.l.Info("Done.") | ||
|
|
||
| s.runCtxM.Lock() | ||
| s.runCtx = ctx | ||
| s.runCtxM.Unlock() | ||
|
|
||
| s.UpdateAdvisorsList(ctx) | ||
| settings, err := models.GetSettings(s.db) | ||
| if err != nil { | ||
| s.l.Errorf("Failed to get settings: %+v.", err) | ||
| return | ||
| } | ||
|
|
||
| s.tm.Lock() | ||
| s.rareTicker = time.NewTicker(settings.SaaS.AdvisorRunIntervals.RareInterval) | ||
| defer s.rareTicker.Stop() | ||
|
|
||
| s.standardTicker = time.NewTicker(settings.SaaS.AdvisorRunIntervals.StandardInterval) | ||
| defer s.standardTicker.Stop() | ||
|
|
||
| s.frequentTicker = time.NewTicker(settings.SaaS.AdvisorRunIntervals.FrequentInterval) | ||
| s.tm.Unlock() | ||
|
|
||
| defer s.rareTicker.Stop() | ||
| defer s.standardTicker.Stop() | ||
| defer s.frequentTicker.Stop() | ||
|
|
||
| // delay for the first run to allow all agents to connect | ||
|
|
@@ -275,8 +286,11 @@ func (s *Service) StartChecks(checkNames []string) error { | |
| return services.ErrAdvisorsDisabled | ||
| } | ||
|
|
||
| s.runCtxM.Lock() | ||
| ctx := s.runCtx | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no need to pass upper-level context here, much simpler to implement via |
||
| s.runCtxM.Unlock() | ||
|
|
||
| go func() { | ||
| ctx := context.Background() | ||
| s.UpdateAdvisorsList(ctx) | ||
| err := s.run(ctx, "", checkNames) | ||
| if err != nil { | ||
|
|
@@ -1636,10 +1650,16 @@ func (s *Service) updateAdvisors(advisors []check.Advisor) { | |
| // UpdateIntervals updates advisor checks restart timer intervals. | ||
| func (s *Service) UpdateIntervals(rare, standard, frequent time.Duration) { | ||
| s.tm.Lock() | ||
| defer s.tm.Unlock() | ||
| // Tickers are created by Run; if it has not started on this node (e.g. not | ||
| // the leader), there is nothing to reset - Run reads the new intervals from | ||
| // the persisted settings when it starts. | ||
| if s.rareTicker == nil || s.standardTicker == nil || s.frequentTicker == nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't the update request be sent to the current leader node only? |
||
| return | ||
| } | ||
| s.rareTicker.Reset(rare) | ||
| s.standardTicker.Reset(standard) | ||
| s.frequentTicker.Reset(frequent) | ||
| s.tm.Unlock() | ||
|
|
||
| s.l.Infof("Intervals are changed: rare %s, standard %s, frequent %s", rare, standard, frequent) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,13 +74,13 @@ func (s *services) StartAllServices(ctx context.Context) { | |
| for id, service := range s.all { | ||
| if _, ok := s.running[id]; !ok { | ||
| s.running[id] = service | ||
| s.wg.Add(1) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the most important fix here ) |
||
| toStart = append(toStart, startItem{svc: service, id: id}) | ||
| } | ||
| } | ||
| s.rw.Unlock() | ||
|
|
||
| for _, service := range toStart { | ||
| s.wg.Add(1) | ||
| go func(svc LeaderService, svcID string) { | ||
| s.l.Infoln("Starting", svcID) | ||
| err := svc.Start(ctx) | ||
|
|
@@ -122,7 +122,12 @@ func (s *services) Wait() { | |
| // removeService removes a service from the registry of running services. | ||
| func (s *services) removeService(id string) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the logic of this function shall be only in one place - at the ent of go-routine in line 90. This function is not needed at all because it creates a dangerous situation with |
||
| s.rw.Lock() | ||
| _, ok := s.running[id] | ||
| delete(s.running, id) | ||
| s.rw.Unlock() | ||
| s.wg.Done() | ||
| // Only decrement for a service we actually removed | ||
| // to avoid negative counter and panic. | ||
| if ok { | ||
| s.wg.Done() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is anti-pattern. https://pkg.go.dev/context