Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cmd/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func runJob(ctx context.Context, helper worker.Helper, job opslevel.RunnerJob) p
logPrefix := func() string { return fmt.Sprintf("%s [%d] ", time.Now().UTC().Format(time.RFC3339), 0) }
streamer := pkg.NewLogStreamer(
logger,
logMaxBytes, // per-stream buffer cap tracks the ship-batch size
pkg.NewFaktorySetOutcomeProcessor(helper, logger, job.Id),
pkg.NewSanitizeLogProcessor(job.Variables),
pkg.NewPrefixLogProcessor(logPrefix),
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
rootCmd.PersistentFlags().String("job-pod-shell", "/bin/sh", "The job pod shell to use for commands run inside the pod.")
rootCmd.PersistentFlags().String("job-pod-workdir", "/jobs", "The job pod working directory.")
rootCmd.PersistentFlags().Int("job-pod-log-max-interval", 30, "The max amount of time between when pod logs are shipped to OpsLevel. Works in tandem with 'job-pod-log-max-size'")
rootCmd.PersistentFlags().Int("job-pod-log-max-size", 1000000, "The max amount in bytes to buffer before pod logs are shipped to OpsLevel. Works in tandem with 'job-pod-log-max-interval'")
rootCmd.PersistentFlags().Int("job-pod-log-max-size", 262144, "The max bytes of pod logs buffered before a batch is shipped to OpsLevel. Works in tandem with 'job-pod-log-max-interval'. Also the dominant per-job memory term: peak runner memory is roughly 'job-concurrency' * 6 * this value, so lower it when running high concurrency.")
rootCmd.PersistentFlags().Bool("job-agent-mode", false, "Enable agent mode with privileged security context for Container-in-Container support. WARNING: This grants elevated privileges and should only be enabled for trusted workloads.")
rootCmd.PersistentFlags().String("job-pod-helper-image", "", "Override the helper init container image. Defaults to the published ECR image matching the runner version. Useful for local development with kind.")
rootCmd.PersistentFlags().String("queue", "", "The queue this runner should process jobs from. Empty means the default queue.")
Expand Down
1 change: 1 addition & 0 deletions src/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func jobWorker(ctx context.Context, wg *sync.WaitGroup, index int, runnerId opsl

streamer := pkg.NewLogStreamer(
logger,
logMaxBytes, // per-stream buffer cap tracks the ship-batch size
pkg.NewSetOutcomeVarLogProcessor(client, logger, runnerId, jobId, jobNumber),
pkg.NewSanitizeLogProcessor(job.Variables),
pkg.NewPrefixLogProcessor(logPrefix),
Expand Down
1 change: 1 addition & 0 deletions src/cmd/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func doTest(cmd *cobra.Command, args []string) error {
}
streamer := pkg.NewLogStreamer(
log.Logger,
0, // unbounded buffers for local testing
pkg.NewSetOutcomeVarLogProcessor(nil, log.Logger, "1", "1", "1"),
pkg.NewSanitizeLogProcessor(job.Variables),
pkg.NewLoggerLogProcessor(log.Logger),
Expand Down
56 changes: 53 additions & 3 deletions src/pkg/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,50 @@ import (
"sync"
)

// SafeBuffer is a goroutine safe bytes.Buffer
// SafeBuffer is a goroutine safe bytes.Buffer with an optional size cap.
//
// The cap exists to protect the runner from OOM: pod stdout/stderr is written
// into this buffer by client-go's exec stream, while the LogStreamer drains it
// on a ticker. If the drain side stalls (e.g. a slow log-shipping API call) or a
// job emits data faster than it can be drained, an unbounded buffer would grow
// until the process is killed. When maxSize is exceeded, writes beyond the cap
// are dropped and the dropped byte count is recorded so the streamer can emit a
// visible marker into the log stream.
type SafeBuffer struct {
buffer bytes.Buffer
mutex sync.Mutex
buffer bytes.Buffer
mutex sync.Mutex
maxSize int
dropped int
}

// NewSafeBuffer returns a SafeBuffer that drops writes once it holds maxSize
// bytes of unread data. A maxSize <= 0 means unbounded.
func NewSafeBuffer(maxSize int) *SafeBuffer {
return &SafeBuffer{maxSize: maxSize}
}

// Write appends the contents of p to the buffer, growing the buffer as needed. It returns
// the number of bytes written.
//
// To the caller (client-go's exec stream copier) the write always "succeeds" —
// returning a short write or error would tear down the exec stream. When the cap
// is reached we accept as much as fits, drop the rest, and track how much was
// dropped.
func (s *SafeBuffer) Write(p []byte) (n int, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.maxSize > 0 {
room := s.maxSize - s.buffer.Len()
if room <= 0 {
s.dropped += len(p)
return len(p), nil
}
if len(p) > room {
s.buffer.Write(p[:room])
s.dropped += len(p) - room
return len(p), nil
}
}
return s.buffer.Write(p)
}

Expand All @@ -27,6 +60,23 @@ func (s *SafeBuffer) String() string {
return s.buffer.String()
}

// Len returns the number of bytes of the unread portion of the buffer.
func (s *SafeBuffer) Len() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.buffer.Len()
}

// DroppedBytes returns the number of bytes dropped since the last call and
// resets the counter.
func (s *SafeBuffer) DroppedBytes() int {
s.mutex.Lock()
defer s.mutex.Unlock()
n := s.dropped
s.dropped = 0
return n
}

// ReadString reads until the first occurrence of delim in the input,
// returning a string containing the data up to and including the delimiter.
// If ReadString encounters an error before finding a delimiter,
Expand Down
69 changes: 55 additions & 14 deletions src/pkg/faktoryRunnerAppendJobLogProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ type FaktoryAppendJobLogProcessor struct {
firstLine bool
lastTime time.Time
elapsed time.Duration
batches chan []string
done chan struct{}
droppedBatches int
}

func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID, maxBytes int, maxTime time.Duration) *FaktoryAppendJobLogProcessor {
return &FaktoryAppendJobLogProcessor{
s := &FaktoryAppendJobLogProcessor{
helper: helper,
logger: logger,
jobId: jobId,
Expand All @@ -34,7 +37,11 @@ func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog
logLinesBytesSize: 0,
firstLine: false,
lastTime: time.Now(),
batches: make(chan []string, shipQueueDepth),
done: make(chan struct{}),
}
go s.ship()
return s
}

func (s *FaktoryAppendJobLogProcessor) Process(line string) string {
Expand All @@ -57,7 +64,7 @@ func (s *FaktoryAppendJobLogProcessor) Process(line string) string {
s.elapsed += time.Since(s.lastTime)
if s.elapsed > s.maxTime {
s.logger.Trace().Msg("Shipping logs because of maxTime ...")
s.elapsed = time.Since(time.Now())
s.elapsed = 0
s.submit()
}
s.lastTime = time.Now()
Expand All @@ -74,21 +81,58 @@ func (s *FaktoryAppendJobLogProcessor) ProcessStderr(line string) string {
}

func (s *FaktoryAppendJobLogProcessor) Flush(outcome JobOutcome) {
if len(s.logLines) > 0 {
s.logger.Trace().Msg("Sleeping before append job logs ...")
time.Sleep(1 * time.Second)
s.submit()
s.logger.Trace().Msg("Finished append job logs ...")
// The pod is done producing, so the final batch must not be dropped: enqueue
// it with a blocking send (the shipper is still draining) before closing.
if batch := s.takeBatch(); batch != nil {
s.batches <- batch
}
close(s.batches)
<-s.done // wait for in-flight batches to finish enqueuing
if s.droppedBatches > 0 {
s.logger.Warn().Msgf("dropped %d log batch(es) for job '%s' due to enqueue backpressure", s.droppedBatches, s.jobId)
}
}

// takeBatch detaches the accumulated lines into a standalone batch and starts a
// fresh buffer; see OpsLevelAppendLogProcessor.takeBatch. Returns nil when there
// is nothing buffered.
func (s *FaktoryAppendJobLogProcessor) takeBatch() []string {
if len(s.logLines) == 0 {
return nil
}
batch := s.logLines
s.logLines = make([]string, 0, len(batch))
s.logLinesBytesSize = 0
return batch
}

// submit hands the current batch off to the background shipper. It never blocks;
// see OpsLevelAppendLogProcessor.submit for the rationale.
func (s *FaktoryAppendJobLogProcessor) submit() {
if len(s.logLines) > 0 {
batch := s.takeBatch()
if batch == nil {
return
}
select {
case s.batches <- batch:
default:
s.droppedBatches++
}
}

// ship runs on its own goroutine, enqueuing batches to Faktory so the
// LogStreamer drain loop never blocks on the (network) enqueue call.
func (s *FaktoryAppendJobLogProcessor) ship() {
defer close(s.done)
for logLines := range s.batches {
if len(logLines) == 0 {
continue
}
job := faktory.NewJob("Runners::Faktory::AppendJobLog", opslevel.RunnerAppendJobLogInput{
RunnerId: "faktory",
RunnerJobId: s.jobId,
SentAt: opslevel.NewISO8601DateNow(),
Logs: s.logLines,
Logs: logLines,
})
job.Queue = "app"
batch := s.helper.Bid()
Expand All @@ -98,19 +142,16 @@ func (s *FaktoryAppendJobLogProcessor) submit() {
})
if err != nil {
MetricEnqueueBatchFailed.Inc()
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId)
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(logLines), s.jobId)
}
} else {
err := s.helper.With(func(cl *faktory.Client) error {
return cl.Push(job)
})
if err != nil {
MetricEnqueueFailed.Inc()
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId)
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(logLines), s.jobId)
}
}
}
s.logLinesBytesSize = 0
s.logLines = nil
s.logLines = []string{}
}
21 changes: 18 additions & 3 deletions src/pkg/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ type LogStreamer struct {
logBuffer *ring.Ring
}

func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) LogStreamer {
// NewLogStreamer builds a streamer whose stdout/stderr buffers are each capped
// at bufferMaxBytes. A bufferMaxBytes <= 0 leaves the buffers unbounded (used by
// tests and local `test` runs). Bytes dropped once a buffer is full are counted
// in the MetricLogBytesDropped metric.
func NewLogStreamer(logger zerolog.Logger, bufferMaxBytes int, processors ...LogProcessor) LogStreamer {
quit := make(chan bool)
return LogStreamer{
Stdout: &SafeBuffer{},
Stderr: &SafeBuffer{},
Stdout: NewSafeBuffer(bufferMaxBytes),
Stderr: NewSafeBuffer(bufferMaxBytes),
processors: processors,
logger: logger,
quit: quit,
Expand Down Expand Up @@ -65,6 +69,15 @@ func (s *LogStreamer) processLine(stream logStream) {
s.logBuffer = s.logBuffer.Next()
}

// recordDroppedBytes folds any bytes a capped buffer had to drop into the
// dropped-bytes metric so log loss is observable rather than silent.
func (s *LogStreamer) recordDroppedBytes() {
dropped := s.Stdout.DroppedBytes() + s.Stderr.DroppedBytes()
if dropped > 0 && MetricLogBytesDropped != nil {
MetricLogBytesDropped.Add(float64(dropped))
}
}

func (s *LogStreamer) GetLogBuffer() []string {
output := make([]string, 0)
s.logBuffer.Do(func(line any) {
Expand Down Expand Up @@ -93,6 +106,7 @@ func (s *LogStreamer) Run(ctx context.Context) {
s.processLine(stream)
}
}
s.recordDroppedBytes()
}
}
}
Expand All @@ -119,6 +133,7 @@ done:
for _, stream := range s.streams() {
s.processLine(stream)
}
s.recordDroppedBytes()
s.logger.Trace().Msg("Flushing log processors ...")
for i := len(s.processors) - 1; i >= 0; i-- {
s.processors[i].Flush(outcome)
Expand Down
33 changes: 31 additions & 2 deletions src/pkg/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pkg

import (
"context"
"strings"
"testing"
"time"

Expand All @@ -27,7 +28,7 @@ func (c *captureProcessor) Flush(_ JobOutcome) {}

func TestLogStreamerPartialLineStdout(t *testing.T) {
cap := &captureProcessor{}
s := NewLogStreamer(zerolog.Nop(), cap)
s := NewLogStreamer(zerolog.Nop(), 0, cap)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -46,9 +47,37 @@ func TestLogStreamerPartialLineStdout(t *testing.T) {
autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, cap.lines)
}

// Data with no newline that exceeds the buffer cap must be bounded to the cap
// (excess dropped) and the capped portion flushed at job end.
func TestLogStreamerCapsOversizedLine(t *testing.T) {
cap := &captureProcessor{}
s := NewLogStreamer(zerolog.Nop(), 64, cap)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Run(ctx)

// 200 bytes, no newline: 64 are buffered, 136 dropped.
_, _ = s.Stdout.Write([]byte(strings.Repeat("x", 200)))
time.Sleep(150 * time.Millisecond)
s.Flush(JobOutcome{})

autopilot.Equals(t, []string{strings.Repeat("x", 64)}, cap.lines)
}

// SafeBuffer must cap resident memory and report what it dropped.
func TestSafeBufferCapsAndReportsDrops(t *testing.T) {
b := NewSafeBuffer(10)
n, _ := b.Write([]byte("0123456789ABCDEF")) // 16 bytes into a 10-byte cap
autopilot.Equals(t, 16, n) // caller always sees a full write
autopilot.Equals(t, 10, b.Len())
autopilot.Equals(t, 6, b.DroppedBytes())
autopilot.Equals(t, 0, b.DroppedBytes()) // counter resets after read
}

func TestLogStreamerPartialLineStderr(t *testing.T) {
cap := &captureProcessor{}
s := NewLogStreamer(zerolog.Nop(), cap)
s := NewLogStreamer(zerolog.Nop(), 0, cap)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
7 changes: 7 additions & 0 deletions src/pkg/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
MetricJobsProcessing prometheus.Gauge
MetricEnqueueFailed prometheus.Counter
MetricEnqueueBatchFailed prometheus.Counter
MetricLogBytesDropped prometheus.Counter
)

func initMetrics(id string) {
Expand Down Expand Up @@ -61,6 +62,12 @@ func initMetrics(id string) {
Help: "The count of jobs that failed to enqueue to faktory for a batch.",
ConstLabels: prometheus.Labels{"runner": id},
})
MetricLogBytesDropped = promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "log_bytes_dropped",
Help: "The count of pod log bytes dropped because a per-stream buffer hit its size cap.",
ConstLabels: prometheus.Labels{"runner": id},
})
}

func StartMetricsServer(id string, port int) {
Expand Down
Loading
Loading