Skip to content
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 164 additions & 65 deletions filters/cache/filter.go

Large diffs are not rendered by default.

311 changes: 296 additions & 15 deletions filters/cache/filter_test.go

Large diffs are not rendered by default.

55 changes: 32 additions & 23 deletions filters/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,23 @@ import (
"github.com/cespare/xxhash/v2"
)

// shardCount is set to 256 to balance lock contention with baseline memory overhead.
// As a power of 2, it allows Go compiler to optimize modulo operations into
// fast bitwise AND operations.
// shardCount balances lock contention against memory overhead.
// Power of 2 lets the compiler reduce modulo to bitwise AND.
const shardCount = 256

// lruItem holds raw []byte instead of Go struct pointers.
// This eliminates Garbage Collector (GC) scanning overhead and allows us to
// enforce strict immutability of cached responses.
// lruItem stores raw bytes to avoid GC scanning of pointer-heavy response structs.
type lruItem struct {
key string
data []byte
size int64
}

// lruShard is a bounded LRU cache protected by a single mutex.
// mu is a standard Mutex (not RWMutex) because LRU reads (Get) modify the
// linked list. Contention is mitigated by sharding.
// Mutex (not RWMutex) is required because Get promotes entries in the list.
type lruShard struct {
// Immutable after construction; not protected by mu.
maxBytes int64
onEvict func() // called once per evicted item; nil = no-op
onEvict func() // called once per evicted item

mu sync.Mutex
currentBytes int64
Expand All @@ -45,22 +41,30 @@ func newLRUShard(maxBytes int64, onEvict func()) *lruShard {
}

func (s *lruShard) set(key string, data []byte) {
evictions := s.setLocked(key, data)
// onEvict is called outside the lock: callbacks may call Bytes() which acquires shard mutexes.
if s.onEvict != nil {
for range evictions {
s.onEvict()
}
}
}

func (s *lruShard) setLocked(key string, data []byte) int {
s.mu.Lock()
defer s.mu.Unlock()

size := int64(len(data))

// If item exceeds shard's capacity, we cannot cache it.
// We must also remove any existing smaller version of this key
// to prevent serving stale or inconsistent data.
// Entry exceeds shard capacity; evict any existing version to avoid serving stale data.
if size > s.maxBytes {
if ele, ok := s.cache[key]; ok {
s.ll.Remove(ele)
item := ele.Value.(*lruItem)
delete(s.cache, key)
s.currentBytes -= item.size
}
return
return 0
}

if ele, ok := s.cache[key]; ok {
Expand All @@ -76,9 +80,13 @@ func (s *lruShard) set(key string, data []byte) {
s.currentBytes += size
}

var evictions int
for s.currentBytes > s.maxBytes {
s.removeOldest()
if s.removeOldest() {
evictions++
}
}
return evictions
}

func (s *lruShard) get(key string) ([]byte, bool) {
Expand Down Expand Up @@ -108,17 +116,18 @@ func (s *lruShard) delete(key string) {
}
}

func (s *lruShard) removeOldest() {
// removeOldest evicts the LRU item. Returns false when the list is empty.
// Caller must invoke onEvict outside the lock.
func (s *lruShard) removeOldest() bool {
ele := s.ll.Back()
if ele != nil {
s.ll.Remove(ele)
item := ele.Value.(*lruItem)
delete(s.cache, item.key)
s.currentBytes -= item.size
if s.onEvict != nil {
s.onEvict()
}
if ele == nil {
return false
}
s.ll.Remove(ele)
item := ele.Value.(*lruItem)
delete(s.cache, item.key)
s.currentBytes -= item.size
return true
}

// ShardedByteLRU manages an array of LRU shards to reduce lock contention
Expand Down
12 changes: 8 additions & 4 deletions filters/cache/lru_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
// It owns all cache semantics (serialisation, TTL expiry); ShardedByteLRU
// remains a pure byte store.
type LRUStorage struct {
lru *ShardedByteLRU
lru *ShardedByteLRU
metrics metrics.Metrics
}

// NewLRUStorage returns an LRUStorage backed by a ShardedByteLRU sized to totalMaxBytes.
func NewLRUStorage(totalMaxBytes int64, onEvict func()) *LRUStorage {
// m records the lru_oversized counter on oversized Set calls; pass metrics.Default when no
// test-scoped collector is needed.
func NewLRUStorage(totalMaxBytes int64, onEvict func(), m metrics.Metrics) *LRUStorage {
return &LRUStorage{
lru: NewShardedByteLRU(totalMaxBytes, onEvict),
lru: NewShardedByteLRU(totalMaxBytes, onEvict),
metrics: m,
}
}

Expand Down Expand Up @@ -59,7 +63,7 @@ func (s *LRUStorage) Set(_ context.Context, key string, entry *Entry) error {
"size_bytes": len(data),
"shard_max": s.lru.shards[0].maxBytes,
}).Warn("cache: entry exceeds shard capacity and will not be stored")
metrics.Default.IncCounter("lru_oversized")
s.metrics.IncCounter("lru_oversized")
return nil
}
s.lru.Set(key, data)
Expand Down
80 changes: 75 additions & 5 deletions filters/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package cache
import (
"context"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"

"github.com/zalando/skipper/metrics"
)

func makeEntry(payload string, ttl time.Duration) *Entry {
Expand All @@ -19,7 +22,7 @@ func makeEntry(payload string, ttl time.Duration) *Entry {
}

func TestLRUStorage_HitAndMiss(t *testing.T) {
s := NewLRUStorage(1<<20, nil) // 1 MB
s := NewLRUStorage(1<<20, nil, metrics.Default)
ctx := context.Background()

got, err := s.Get(ctx, "missing")
Expand Down Expand Up @@ -48,7 +51,7 @@ func TestLRUStorage_HitAndMiss(t *testing.T) {
}

func TestLRUStorage_HardExpiry(t *testing.T) {
s := NewLRUStorage(1<<20, nil)
s := NewLRUStorage(1<<20, nil, metrics.Default)
ctx := context.Background()

entry := makeEntry("stale", time.Millisecond)
Expand All @@ -75,7 +78,7 @@ func TestLRUStorage_HardExpiry(t *testing.T) {
}

func TestLRUStorage_Delete(t *testing.T) {
s := NewLRUStorage(1<<20, nil)
s := NewLRUStorage(1<<20, nil, metrics.Default)
ctx := context.Background()

if err := s.Set(ctx, "del", makeEntry("x", time.Minute)); err != nil {
Expand All @@ -94,7 +97,7 @@ func TestLRUStorage_Delete(t *testing.T) {
func TestLRUStorage_InPlaceUpdate(t *testing.T) {
sample, _ := json.Marshal(makeEntry("v1", time.Minute))
entrySize := int64(len(sample)) + 20
s := NewLRUStorage(entrySize*shardCount, nil)
s := NewLRUStorage(entrySize*shardCount, nil, metrics.Default)
ctx := context.Background()

// Overwrite an existing key — Get must return the new payload.
Expand All @@ -115,7 +118,7 @@ func TestLRUStorage_InPlaceUpdate(t *testing.T) {
}

func TestLRUStorage_ImmutabilityAfterSet(t *testing.T) {
s := NewLRUStorage(1<<20, nil)
s := NewLRUStorage(1<<20, nil, metrics.Default)
ctx := context.Background()

entry := makeEntry("original", time.Minute)
Expand All @@ -135,6 +138,73 @@ func TestLRUStorage_ImmutabilityAfterSet(t *testing.T) {
}
}

func TestLRUStorage_EvictionCallbackDoesNotDeadlock(t *testing.T) {
// Regression: onEvict called Bytes() which re-acquired the shard mutex
// already held by set(), deadlocking the goroutine.
var lru *LRUStorage
lru = NewLRUStorage(1<<20, func() {
// This mirrors the onEvict in NewCacheFilter.
_ = lru.lru.Bytes()
}, metrics.Default)
ctx := context.Background()

// Fill one shard past capacity to force eviction. Each entry is ~100 bytes;
// writing shardCount+1 unique keys guarantees at least one shard overflows.
sample, _ := json.Marshal(makeEntry("x", time.Minute))
entrySize := int64(len(sample)) + 20
// Use a tiny budget so the first two writes to the same shard evict.
lru = NewLRUStorage(entrySize*int64(shardCount), func() {
_ = lru.lru.Bytes()
}, metrics.Default)

done := make(chan struct{})
go func() {
defer close(done)
// Write shardCount+1 distinct keys — guarantees eviction on at least one shard.
for i := range shardCount + 1 {
key := fmt.Sprintf("key-%d", i)
_ = lru.Set(ctx, key, makeEntry("payload", time.Minute))
}
}()

select {
case <-done:
// passed
case <-time.After(5 * time.Second):
t.Fatal("deadlock: Set() did not return within 5 seconds")
}
}

func TestLRUStorage_OversizedEntry(t *testing.T) {
// With 256 shards and 1 KB total capacity, each shard holds 4 bytes.
// A payload larger than 4 bytes exceeds every shard's maxBytes.
const totalBytes = 1024 // 1 KB → 4 bytes per shard
m := &testMetrics{}
s := NewLRUStorage(totalBytes, nil, m)

ctx := context.Background()
entry := makeEntry(string(make([]byte, 1000)), time.Minute) // 1000-byte payload ≫ 4-byte shard

// Set must succeed (nil error) even though the entry is too large to store.
if err := s.Set(ctx, "oversized", entry); err != nil {
t.Fatalf("Set returned unexpected error: %v", err)
}

// The lru_oversized counter must have been incremented exactly once.
if got := m.counter("lru_oversized"); got != 1 {
t.Errorf("lru_oversized counter: got %d, want 1", got)
}

// The entry must not have been stored — Get must return nil.
got, err := s.Get(ctx, "oversized")
if err != nil {
t.Fatalf("Get returned unexpected error: %v", err)
}
if got != nil {
t.Errorf("expected Get to return nil for oversized entry, got %+v", got)
}
}

func TestLRUShard_CrossKeyEviction(t *testing.T) {
// Test lruShard directly to make eviction deterministic — no hash routing.
dataA := []byte("aaaa")
Expand Down
10 changes: 8 additions & 2 deletions filters/cache/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ type Entry struct {
StaleIfError time.Duration
}

// IsStale reports whether the entry is past its TTL but still within the
// stale-while-revalidate window relative to now.
// IsStale reports whether the entry is past its TTL but still within
// the stale-while-revalidate window relative to now.
func (e *Entry) IsStale(now time.Time) bool {
return now.After(e.CreatedAt.Add(e.TTL)) && now.Before(e.CreatedAt.Add(e.TTL+e.StaleWhileRevalidate))
}

// IsUsable reports whether the entry is fresh or within the stale-while-revalidate window.
// Entries past TTL+SWR are retained only for stale-if-error and must not be served.
func (e *Entry) IsUsable(now time.Time) bool {
return now.Before(e.CreatedAt.Add(e.TTL + e.StaleWhileRevalidate))
}

// Storage is the backing store abstraction for cached entries.
// Implementations must be safe for concurrent use.
type Storage interface {
Expand Down
91 changes: 91 additions & 0 deletions filters/cache/valkey_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cache

import (
"context"
"encoding/json"
"fmt"
"time"

log "github.com/sirupsen/logrus"
"github.com/valkey-io/valkey-go"
"github.com/zalando/skipper/metrics"
skpnet "github.com/zalando/skipper/net"
)

// valkeyClient is the subset of skpnet.ValkeyRingClient methods used by ValkeyStorage.
type valkeyClient interface {
Get(ctx context.Context, key string) (string, error)
SetWithExpire(ctx context.Context, key string, value string, expire time.Duration) error
Expire(ctx context.Context, key string, d time.Duration) (int64, error)
}

var _ valkeyClient = (*skpnet.ValkeyRingClient)(nil)

// ValkeyStorage implements Storage using a ValkeyRingClient (L2) with
// automatic fallback to LRUStorage (L1) on any Valkey error.
type ValkeyStorage struct {
ring valkeyClient
l1 *LRUStorage
metrics metrics.Metrics
}

// NewValkeyStorage creates a ValkeyStorage backed by ring (L2) with l1 as the
// fallback in-memory cache. m is used to record per-operation counters:
//
// - valkey_miss — clean cache miss (key not found in Valkey)
// - valkey_get_fallback — Valkey error on Get; L1 was consulted instead
// - valkey_set_fallback — Valkey error on Set; L1 was written instead
//
// Pass metrics.Default when no test-scoped metrics collector is needed.
func NewValkeyStorage(ring *skpnet.ValkeyRingClient, l1 *LRUStorage, m metrics.Metrics) *ValkeyStorage {
return &ValkeyStorage{ring: ring, l1: l1, metrics: m}
}

func (s *ValkeyStorage) Get(ctx context.Context, key string) (*Entry, error) {
data, err := s.ring.Get(ctx, key)
if err != nil {
if valkey.IsValkeyNil(err) {
s.metrics.IncCounter("valkey_miss")
return nil, nil
}
s.metrics.IncCounter("valkey_get_fallback")
log.WithError(err).Warn("cache: valkey Get failed, falling back to L1")
return s.l1.Get(ctx, key)
}
var e Entry
if err := json.Unmarshal([]byte(data), &e); err != nil {
return nil, fmt.Errorf("cache: decode valkey entry: %w", err)
}
return &e, nil
}

func (s *ValkeyStorage) Set(ctx context.Context, key string, entry *Entry) error {
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("cache: encode valkey entry: %w", err)
}

ttl := entry.TTL + max(entry.StaleIfError, entry.StaleWhileRevalidate)
if ttl <= 0 {
ttl = time.Minute
}

if err := s.ring.SetWithExpire(ctx, key, string(data), ttl); err != nil {
s.metrics.IncCounter("valkey_set_fallback")
log.WithError(err).Warn("cache: valkey Set failed, falling back to L1")
return s.l1.Set(ctx, key, entry)
}
// Write-around: L1 is not warmed on a successful Valkey Set. Subsequent
// Valkey hits skip L1 entirely; L1 is only populated on Valkey failures.
return nil
}

func (s *ValkeyStorage) Delete(ctx context.Context, key string) error {
// ValkeyRingClient exposes no DEL; use EXPIRE key -1 (immediate deletion per Valkey docs).
// -1*time.Second is required: time.Duration(-1) is -1ns, which truncates to EXPIRE key 0.
// Valkey errors are best-effort — L1 delete always runs.
if _, err := s.ring.Expire(ctx, key, -1*time.Second); err != nil {
log.WithError(err).Warn("cache: valkey Delete failed")
}
return s.l1.Delete(ctx, key)
}
Loading
Loading