Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 116 additions & 109 deletions src/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cattery/lib/messages"
"context"
"errors"
"fmt"
"os"
"os/signal"
"path"
Expand All @@ -16,13 +17,15 @@ import (

"github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

var RunnerFolder string
var CatteryServerUrl string
var Id string

// shutdownCause is used as context.Cause to carry the termination reason.
// shutdownCause is returned by watchers to report why shutdown was requested.
// Satisfies error so it can propagate through errgroup.
type shutdownCause struct {
reason messages.UnregisterReason
message string
Expand Down Expand Up @@ -56,7 +59,9 @@ func NewCatteryAgent(runnerFolder string, catteryServerUrl string, agentId strin
func (a *CatteryAgent) Start() {
a.logger.Info("Starting Cattery Agent")

agent, jitConfig, err := a.catteryClient.RegisterAgent(a.agentId)
registerCtx, cancelRegister := context.WithTimeout(context.Background(), 30*time.Second)
agent, jitConfig, err := a.catteryClient.RegisterAgent(registerCtx, a.agentId)
cancelRegister()
if err != nil {
a.logger.Errorf("Failed to register agent: %v", err)
return
Expand All @@ -65,55 +70,59 @@ func (a *CatteryAgent) Start() {

a.logger.Info("Agent registered, starting Listener")

ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)

a.watchSignal(ctx, cancel)
a.watchFile(ctx, cancel)
a.watchPing(ctx, cancel)

var ghListener = githubListener.NewGithubListener(a.listenerExecPath)
ghListener.Start(ctx, cancel, jitConfig)

// Block until any source triggers cancellation
<-ctx.Done()
// File watcher setup is synchronous so startup fails fast and doesn't
// leak a goroutine or half-initialized fsnotify state.
watcher, err := a.setupFileWatcher()
if err != nil {
a.logger.Errorf("Failed to start file watcher: %v", err)
a.unregisterAndShutdown(messages.UnregisterReasonDone, "file watcher setup: "+err.Error())
return
}
defer watcher.Close()

g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error { return a.watchSignal(ctx) })
g.Go(func() error { return a.watchFile(ctx, watcher) })
g.Go(func() error { return a.watchPing(ctx) })
g.Go(func() error {
listener := githubListener.NewGithubListener(a.listenerExecPath)
err := listener.Run(ctx, jitConfig)
// Listener exit (clean or otherwise) must cancel the group. Translate
// into a shutdownCause so Wait() returns an error and errgroup signals
// the other watchers.
if err == nil {
return &shutdownCause{reason: messages.UnregisterReasonDone, message: "Listener finished"}
}
return &shutdownCause{reason: messages.UnregisterReasonDone, message: "Listener exited: " + err.Error()}
})

// Determine what happened
reason, msg := a.resolveShutdownCause(ctx)
reason, msg := a.resolveShutdownCause(g.Wait())
a.logger.Infof("Shutdown: reason=%d, message=%s", reason, msg)

// Kill listener if it wasn't the one that finished
if reason != messages.UnregisterReasonDone {
ghListener.Stop()
}

a.unregisterAndShutdown(reason, msg)
}

// resolveShutdownCause extracts the termination reason from the context cause.
// - shutdownCause: a watcher triggered shutdown (signal, file, ping)
// - nil cause: listener exited cleanly
// - other error: listener exited with error
func (a *CatteryAgent) resolveShutdownCause(ctx context.Context) (messages.UnregisterReason, string) {
cause := context.Cause(ctx)

// resolveShutdownCause unwraps the first error returned by the errgroup.
// Watchers wrap their shutdown reasons in *shutdownCause; anything else is
// treated as a listener-finished signal.
func (a *CatteryAgent) resolveShutdownCause(err error) (messages.UnregisterReason, string) {
var sc *shutdownCause
if errors.As(cause, &sc) {
if errors.As(err, &sc) {
return sc.reason, sc.message
}

// Listener finished (cancel was called with nil or a process error)
if cause == nil {
if err == nil {
return messages.UnregisterReasonDone, "Listener finished"
}
return messages.UnregisterReasonDone, "Listener exited: " + cause.Error()
return messages.UnregisterReasonDone, err.Error()
}

func (a *CatteryAgent) unregisterAndShutdown(reason messages.UnregisterReason, msg string) {
log.Infof("Stopping Cattery Agent with reason: %d, message: `%s`", reason, msg)

err := a.catteryClient.UnregisterAgent(a.agent, reason, msg)
if err != nil {
unregisterCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := a.catteryClient.UnregisterAgent(unregisterCtx, a.agent, reason, msg); err != nil {
a.logger.Errorf("Failed to unregister agent: %v", err)
}

Expand All @@ -123,93 +132,91 @@ func (a *CatteryAgent) unregisterAndShutdown(reason messages.UnregisterReason, m
}
}

func (a *CatteryAgent) watchSignal(ctx context.Context, cancel context.CancelCauseFunc) {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

select {
case <-ctx.Done():
return
case sig := <-sigs:
a.logger.Info("Got signal ", sig)
cancel(&shutdownCause{
reason: messages.UnregisterReasonSigTerm,
message: "Got signal " + sig.String(),
})
func (a *CatteryAgent) watchSignal(ctx context.Context) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigs)

select {
case <-ctx.Done():
return ctx.Err()
case sig := <-sigs:
a.logger.Info("Got signal ", sig)
return &shutdownCause{
reason: messages.UnregisterReasonSigTerm,
message: "Got signal " + sig.String(),
}
}()
}
}

func (a *CatteryAgent) watchFile(ctx context.Context, cancel context.CancelCauseFunc) {
func (a *CatteryAgent) setupFileWatcher() (*fsnotify.Watcher, error) {
const shutdownFile = "./shutdown_file"

go func() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
a.logger.Fatalf("Failed to create file watcher: %v", err)
}
defer watcher.Close()
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("create file watcher: %w", err)
}

// Create the shutdown file if it doesn't exist
f, err := os.OpenFile(shutdownFile, os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
a.logger.Fatalf("Failed to create shutdown file: %v", err)
}
f.Close()
f, err := os.OpenFile(shutdownFile, os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
watcher.Close()
return nil, fmt.Errorf("create shutdown file: %w", err)
}
f.Close()

if err := watcher.Add(shutdownFile); err != nil {
a.logger.Fatalf("Failed to watch shutdown file: %v", err)
}
if err := watcher.Add(shutdownFile); err != nil {
watcher.Close()
return nil, fmt.Errorf("watch shutdown file: %w", err)
}

select {
case <-ctx.Done():
return
case event := <-watcher.Events:
msg := "Shutdown file changed: " + event.Name
a.logger.Info(msg)
cancel(&shutdownCause{
reason: messages.UnregisterReasonPreempted,
message: msg,
})
case watchErr := <-watcher.Errors:
msg := "File watcher error: " + watchErr.Error()
a.logger.Error(msg)
cancel(&shutdownCause{
reason: messages.UnregisterReasonPreempted,
message: msg,
})
}
}()
return watcher, nil
}

func (a *CatteryAgent) watchPing(ctx context.Context, cancel context.CancelCauseFunc) {
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
func (a *CatteryAgent) watchFile(ctx context.Context, watcher *fsnotify.Watcher) error {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.Events:
msg := "Shutdown file changed: " + event.Name
a.logger.Info(msg)
return &shutdownCause{reason: messages.UnregisterReasonPreempted, message: msg}
case watchErr := <-watcher.Errors:
msg := "File watcher error: " + watchErr.Error()
a.logger.Error(msg)
return &shutdownCause{reason: messages.UnregisterReasonPreempted, message: msg}
}
}

pingResponse, err := a.catteryClient.Ping()
if err != nil {
a.logger.Errorf("Error pinging controller: %v", err)
time.Sleep(60 * time.Second)
continue
}
func (a *CatteryAgent) watchPing(ctx context.Context) error {
const pingInterval = 60 * time.Second
const pingTimeout = 15 * time.Second

for {
pingCtx, cancel := context.WithTimeout(ctx, pingTimeout)
pingResponse, err := a.catteryClient.Ping(pingCtx)
cancel()

// If ctx was cancelled while Ping was in flight, exit without logging
// a spurious transport error.
if ctx.Err() != nil {
return ctx.Err()
}

if pingResponse.Terminate {
msg := "Controller requested termination: " + pingResponse.Message
a.logger.Info(msg)
cancel(&shutdownCause{
reason: messages.UnregisterReasonControllerKill,
message: msg,
})
return
if err != nil {
a.logger.Errorf("Error pinging controller: %v", err)
} else if pingResponse.Terminate {
msg := "Controller requested termination: " + pingResponse.Message
a.logger.Info(msg)
return &shutdownCause{
reason: messages.UnregisterReasonControllerKill,
message: msg,
}
}

time.Sleep(60 * time.Second)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pingInterval):
}
}()
}
}
Loading
Loading