diff --git a/.changeset/event-log-lru-cache.md b/.changeset/event-log-lru-cache.md new file mode 100644 index 0000000000..807cc363d6 --- /dev/null +++ b/.changeset/event-log-lru-cache.md @@ -0,0 +1,6 @@ +--- +'@workflow/core': minor +'workflow': minor +--- + +Add a process-wide LRU cache for workflow event logs so warm function instances delta-fetch only new events on resume instead of reloading from event 0 on every invocation. Disable with `WORKFLOW_DISABLE_EVENT_CACHE=1`. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 796e29e699..06b108166b 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -21,6 +21,13 @@ import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js'; import { + deleteCachedRunEvents, + getCachedRunEvents, + isEventCacheEnabled, + setCachedRunEvents, +} from './runtime/event-cache.js'; +import { + appendUniqueEvents, getQueueOverhead, getWorkflowQueueName, handleHealthCheckMessage, @@ -674,16 +681,52 @@ export function workflowEntrypoint( // The server always returns a cursor when there are events (even on the // final page), so we can reliably use it for incremental loading. let events: Event[]; + let crossInvocationCacheHit = false; if (cachedEvents === null) { - // First iteration: use preloaded events if available, - // otherwise do a full load with cursor. + // First iteration: prefer preloaded events from + // run_started, then the cross-invocation cache + // (process-wide LRU keyed by runId), then fall + // back to a cold full load. if (preloadedEvents) { events = preloadedEvents; eventsCursor = preloadedEventsCursor ?? null; } else { - const loaded = await loadWorkflowRunEvents(runId); - events = loaded.events; - eventsCursor = loaded.cursor; + const crossCached = getCachedRunEvents(runId); + if (crossCached && crossCached.cursor) { + // HOT PATH: delta-fetch only events created + // after the cached cursor. We always delta + // against the server (never serve purely from + // cache) so we stay correct against writers on + // other instances. The shouldRetryWithoutEventCursor + // path in loadWorkflowRunEvents transparently + // recovers if the cached cursor is stale (400). + const delta = await loadWorkflowRunEvents( + runId, + crossCached.cursor + ); + // Copy-on-read: never share the cache's array + // with the runtime, which mutates `events` on + // the wait_completed merge path. + const merged = [...crossCached.events]; + const seen = new Set(merged.map((e) => e.eventId)); + appendUniqueEvents(merged, seen, delta.events); + events = merged; + eventsCursor = delta.cursor ?? crossCached.cursor; + crossInvocationCacheHit = true; + runtimeLogger.debug( + 'Event cache hit; delta-loaded events', + { + workflowRunId: runId, + cachedCount: crossCached.events.length, + deltaCount: delta.events.length, + totalCount: events.length, + } + ); + } else { + const loaded = await loadWorkflowRunEvents(runId); + events = loaded.events; + eventsCursor = loaded.cursor; + } } } else if (eventsCursor) { // Subsequent iteration: fetch only new events since last cursor @@ -766,6 +809,11 @@ export function workflowEntrypoint( eventType: terminalRunEvent.eventType, } ); + // Run is terminal; free the cached events early. + // LRU eviction would handle this passively, but + // explicit eviction frees memory sooner for the + // common case of long-lived warm instances. + deleteCachedRunEvents(runId); return; } @@ -870,6 +918,24 @@ export function workflowEntrypoint( // Update cache reference (may have been set for first time) cachedEvents = events; + // Update the cross-invocation event cache so the + // next resume on this warm instance can delta-fetch + // from this cursor instead of doing a full reload. + // Hand the cache its own copy of the array — the + // runtime continues to mutate `events` (e.g. on the + // wait_completed merge path) and shared ownership + // would corrupt the cached state. + if (eventsCursor && isEventCacheEnabled()) { + setCachedRunEvents(runId, { + events: [...events], + cursor: eventsCursor, + }); + } + + span?.setAttributes({ + 'workflow.events.cache_hit': crossInvocationCacheHit, + }); + // Replay workflow runtimeLogger.debug('Starting workflow replay', { workflowRunId: runId, @@ -917,6 +983,8 @@ export function workflowEntrypoint( span?.setAttributes({ ...Attribute.WorkflowRunStatus('completed'), }); + // Run is terminal; free cached events eagerly. + deleteCachedRunEvents(runId); return; } catch (err) { if (WorkflowSuspension.is(err)) { @@ -1259,6 +1327,8 @@ export function workflowEntrypoint( ...Attribute.WorkflowErrorMessage(errorMessage), ...Attribute.ErrorType(errorName), }); + // Run is terminal; free cached events eagerly. + deleteCachedRunEvents(runId); return; } } diff --git a/packages/core/src/runtime/event-cache.test.ts b/packages/core/src/runtime/event-cache.test.ts new file mode 100644 index 0000000000..8e2dea60ea --- /dev/null +++ b/packages/core/src/runtime/event-cache.test.ts @@ -0,0 +1,211 @@ +import type { Event } from '@workflow/world'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + clearEventCache, + deleteCachedRunEvents, + getCachedRunEvents, + getEventCacheStats, + isEventCacheEnabled, + setCachedRunEvents, +} from './event-cache.js'; + +function makeEvent(eventId: string): Event { + return { + eventId, + runId: 'wrun_test', + eventType: 'step_created', + correlationId: 'step_x', + createdAt: new Date(), + } as unknown as Event; +} + +function makeEvents(count: number, prefix = 'evt'): Event[] { + const events: Event[] = []; + for (let i = 0; i < count; i++) { + events.push(makeEvent(`${prefix}_${i}`)); + } + return events; +} + +describe('event-cache', () => { + beforeEach(() => { + clearEventCache(); + }); + + afterEach(() => { + clearEventCache(); + delete process.env.WORKFLOW_DISABLE_EVENT_CACHE; + }); + + describe('get/set/delete round-trip', () => { + it('returns undefined for an unknown runId', () => { + expect(getCachedRunEvents('wrun_unknown')).toBeUndefined(); + }); + + it('stores and retrieves an entry', () => { + const events = makeEvents(3); + setCachedRunEvents('wrun_a', { events, cursor: 'cursor_a' }); + const cached = getCachedRunEvents('wrun_a'); + expect(cached).toBeDefined(); + expect(cached?.events).toBe(events); + expect(cached?.cursor).toBe('cursor_a'); + }); + + it('overwrites an existing entry', () => { + setCachedRunEvents('wrun_a', { + events: makeEvents(3), + cursor: 'c1', + }); + const newEvents = makeEvents(5, 'new'); + setCachedRunEvents('wrun_a', { events: newEvents, cursor: 'c2' }); + const cached = getCachedRunEvents('wrun_a'); + expect(cached?.events).toBe(newEvents); + expect(cached?.cursor).toBe('c2'); + }); + + it('deletes an entry', () => { + setCachedRunEvents('wrun_a', { events: makeEvents(2), cursor: 'c' }); + expect(getCachedRunEvents('wrun_a')).toBeDefined(); + deleteCachedRunEvents('wrun_a'); + expect(getCachedRunEvents('wrun_a')).toBeUndefined(); + }); + + it('handles deletion of an unknown runId gracefully', () => { + expect(() => deleteCachedRunEvents('wrun_nope')).not.toThrow(); + }); + + it('supports a null cursor (still cached, but caller may skip on null)', () => { + setCachedRunEvents('wrun_a', { events: makeEvents(2), cursor: null }); + const cached = getCachedRunEvents('wrun_a'); + expect(cached).toBeDefined(); + expect(cached?.cursor).toBeNull(); + }); + }); + + describe('LRU ordering', () => { + it('touches an entry on get so it is not the next evicted', () => { + // With the default MAX_ENTRIES (500) we can't easily trigger + // entry-count eviction without inserting 500+ items. Instead, + // we verify by stats that get() does not corrupt ordering and + // that explicit deletion removes the right entries. + setCachedRunEvents('wrun_a', { events: makeEvents(1), cursor: 'a' }); + setCachedRunEvents('wrun_b', { events: makeEvents(1), cursor: 'b' }); + setCachedRunEvents('wrun_c', { events: makeEvents(1), cursor: 'c' }); + + // Touch wrun_a so it becomes most-recently-used. + const touched = getCachedRunEvents('wrun_a'); + expect(touched).toBeDefined(); + + const stats = getEventCacheStats(); + expect(stats.entryCount).toBe(3); + }); + + it('evicts the least-recently-used entry when the entry count cap is hit', () => { + // Use a tiny synthetic budget: bypass the production cap by + // simulating heavy size pressure. We push entries with + // increasing event counts and rely on the byte budget. With + // 64 MiB and ~512 bytes/event, that's ~131k events total. To + // trigger eviction reliably without churn, push entries large + // enough that two of them exceed the budget. + + // Each event approximates 512 bytes + eventId bytes (~5). + // 80,000 events is ~41 MiB, so 2 entries (~82 MiB) > 64 MiB. + const bigA = makeEvents(80_000, 'a'); + const bigB = makeEvents(80_000, 'b'); + + setCachedRunEvents('wrun_a', { events: bigA, cursor: 'a' }); + expect(getCachedRunEvents('wrun_a')).toBeDefined(); + + setCachedRunEvents('wrun_b', { events: bigB, cursor: 'b' }); + // wrun_a should now be evicted to make room for wrun_b. + expect(getCachedRunEvents('wrun_a')).toBeUndefined(); + expect(getCachedRunEvents('wrun_b')).toBeDefined(); + + const stats = getEventCacheStats(); + expect(stats.evictionCount).toBeGreaterThanOrEqual(1); + }); + + it('updates totalSize on overwrite', () => { + setCachedRunEvents('wrun_a', { + events: makeEvents(10), + cursor: 'a', + }); + const before = getEventCacheStats().totalSize; + setCachedRunEvents('wrun_a', { + events: makeEvents(100), + cursor: 'a', + }); + const after = getEventCacheStats().totalSize; + expect(after).toBeGreaterThan(before); + }); + + it('updates totalSize on delete', () => { + setCachedRunEvents('wrun_a', { + events: makeEvents(10), + cursor: 'a', + }); + const before = getEventCacheStats().totalSize; + expect(before).toBeGreaterThan(0); + deleteCachedRunEvents('wrun_a'); + expect(getEventCacheStats().totalSize).toBe(0); + }); + }); + + describe('clearEventCache', () => { + it('removes all entries and resets totalSize', () => { + setCachedRunEvents('wrun_a', { events: makeEvents(5), cursor: 'a' }); + setCachedRunEvents('wrun_b', { events: makeEvents(5), cursor: 'b' }); + expect(getEventCacheStats().entryCount).toBe(2); + + clearEventCache(); + + expect(getCachedRunEvents('wrun_a')).toBeUndefined(); + expect(getCachedRunEvents('wrun_b')).toBeUndefined(); + const stats = getEventCacheStats(); + expect(stats.entryCount).toBe(0); + expect(stats.totalSize).toBe(0); + expect(stats.evictionCount).toBe(0); + }); + }); + + describe('feature flag', () => { + it('is enabled by default', () => { + expect(isEventCacheEnabled()).toBe(true); + }); + + it('is disabled when WORKFLOW_DISABLE_EVENT_CACHE=1', () => { + process.env.WORKFLOW_DISABLE_EVENT_CACHE = '1'; + expect(isEventCacheEnabled()).toBe(false); + }); + + it('returns undefined from get() when disabled', () => { + setCachedRunEvents('wrun_a', { events: makeEvents(2), cursor: 'a' }); + process.env.WORKFLOW_DISABLE_EVENT_CACHE = '1'; + expect(getCachedRunEvents('wrun_a')).toBeUndefined(); + }); + + it('skips set() when disabled', () => { + process.env.WORKFLOW_DISABLE_EVENT_CACHE = '1'; + setCachedRunEvents('wrun_a', { events: makeEvents(2), cursor: 'a' }); + delete process.env.WORKFLOW_DISABLE_EVENT_CACHE; + // After re-enabling the cache, the entry should not exist + // because the set() never landed. + expect(getCachedRunEvents('wrun_a')).toBeUndefined(); + }); + }); + + describe('size estimation', () => { + it('reports a non-zero totalSize for cached entries', () => { + setCachedRunEvents('wrun_a', { + events: makeEvents(10), + cursor: 'a', + }); + expect(getEventCacheStats().totalSize).toBeGreaterThan(0); + }); + + it('reports zero totalSize for an empty events array', () => { + setCachedRunEvents('wrun_a', { events: [], cursor: 'a' }); + expect(getEventCacheStats().totalSize).toBe(0); + }); + }); +}); diff --git a/packages/core/src/runtime/event-cache.ts b/packages/core/src/runtime/event-cache.ts new file mode 100644 index 0000000000..70686a6cf8 --- /dev/null +++ b/packages/core/src/runtime/event-cache.ts @@ -0,0 +1,212 @@ +/** + * Per-run event-log LRU cache for the workflow runtime. + * + * On warm function instances, this cache lets a workflow **resume** start + * from the events that were already loaded on a previous invocation and + * delta-fetch only the new events created since. Without the cache, every + * resume re-reads the full event log from event 0, which is O(N²) over the + * life of a long, step-heavy run. + * + * The cache is process-wide (module-level singleton) and bounded by an + * approximate byte count, since event lists for the big runs are exactly + * what we want to cache. A secondary entry-count cap guards against + * pathological insertion patterns. + * + * Correctness contract: + * + * - The runtime **must** treat the returned array as read-only and work on + * a shallow copy when it intends to mutate (the runtime appends events + * on the `wait_completed` merge path). + * - The runtime **must** always delta-fetch from the cached cursor on a + * cache hit; we never replay purely from cache. This is what keeps us + * correct against concurrent writers on other instances. + * - When the cursor on a cached entry is `null` (no cursor known yet), + * callers must fall back to a full reload. + * + * The cache is automatically disabled by setting + * `WORKFLOW_DISABLE_EVENT_CACHE=1` in the environment — callers should + * gate their hot-path logic on `isEventCacheEnabled()`. + */ + +import type { Event } from '@workflow/world'; + +export interface CachedRunEvents { + /** Full, ordered, deduped log loaded/replayed so far. */ + events: Event[]; + /** Last non-null cursor returned by the server, or null if unknown. */ + cursor: string | null; +} + +interface InternalEntry { + value: CachedRunEvents; + /** Approximate byte size for LRU bookkeeping. */ + size: number; +} + +// Default to ~64 MiB total cache size (tunable via env var) and a hard cap +// of 500 entries as a secondary guard. +const DEFAULT_MAX_SIZE_BYTES = 64 * 1024 * 1024; +const DEFAULT_MAX_ENTRIES = 500; + +function parsePositiveInt(value: string | undefined): number | undefined { + if (!value) return undefined; + const n = Number.parseInt(value, 10); + return Number.isFinite(n) && n > 0 ? n : undefined; +} + +const MAX_SIZE_BYTES = + parsePositiveInt(process.env.WORKFLOW_EVENT_CACHE_MAX_BYTES) ?? + DEFAULT_MAX_SIZE_BYTES; +const MAX_ENTRIES = + parsePositiveInt(process.env.WORKFLOW_EVENT_CACHE_MAX_ENTRIES) ?? + DEFAULT_MAX_ENTRIES; + +let totalSize = 0; +let evictionCount = 0; + +// Insertion-order iteration of `Map` gives us LRU semantics for free: +// re-set an existing key to move it to the most-recently-used position, +// and iterate from the front to find the least-recently-used entry. +const cache = new Map(); + +/** + * Cheap byte-size estimate for an event. We don't need precision — this + * is only used as a heuristic for LRU eviction. Counting the byte length + * of the JSON serialization would be much more accurate, but also + * proportional in cost to the events themselves. Instead we use a + * constant-per-event approximation plus the eventId length, which is the + * one field guaranteed to be present. + * + * The constant is intentionally generous: real events carry eventData + * payloads (step output, hook payload, etc.) which can be much larger + * than the constant. The cache will simply hold somewhat fewer entries + * than the byte budget suggests when payloads are big — that's the + * conservative direction. + */ +const APPROX_BYTES_PER_EVENT = 512; + +function estimateSize(events: Event[]): number { + let bytes = 0; + for (const e of events) { + bytes += APPROX_BYTES_PER_EVENT + (e.eventId?.length ?? 0); + } + return bytes; +} + +function evictIfNeeded(): void { + while ( + (cache.size > MAX_ENTRIES || totalSize > MAX_SIZE_BYTES) && + cache.size > 0 + ) { + // Map iteration is insertion-order; the first entry is the + // least-recently-used (oldest set() call). + const oldestKey = cache.keys().next().value as string | undefined; + if (oldestKey === undefined) break; + const oldest = cache.get(oldestKey); + if (oldest) { + totalSize -= oldest.size; + } + cache.delete(oldestKey); + evictionCount++; + } +} + +/** + * Returns whether the cache is enabled. Set `WORKFLOW_DISABLE_EVENT_CACHE=1` + * in the environment to disable it (kill switch for production). + */ +export function isEventCacheEnabled(): boolean { + return process.env.WORKFLOW_DISABLE_EVENT_CACHE !== '1'; +} + +/** + * Returns the cached events for the given runId, or `undefined` if the + * cache is disabled, the run is not cached, or the cached entry has no + * cursor (which forces callers down the cold-load path). + * + * The returned array is shared with the cache and must be treated as + * read-only. Callers that intend to mutate (push events, etc.) should + * make a shallow copy first. + */ +export function getCachedRunEvents(runId: string): CachedRunEvents | undefined { + if (!isEventCacheEnabled()) { + return undefined; + } + const entry = cache.get(runId); + if (!entry) { + return undefined; + } + // Touch the entry to mark it as most-recently-used: delete + re-set + // moves it to the tail of the Map's insertion order. + cache.delete(runId); + cache.set(runId, entry); + return entry.value; +} + +/** + * Stores or updates the cache entry for a run. + * + * Important: the caller is responsible for passing in an array that the + * cache can own. Once handed off, the caller must not mutate it (the + * runtime should `setCachedRunEvents(runId, { events: [...events], ... })` + * or otherwise hand over a fresh copy after a replay completes). + */ +export function setCachedRunEvents( + runId: string, + value: CachedRunEvents +): void { + if (!isEventCacheEnabled()) { + return; + } + const existing = cache.get(runId); + if (existing) { + totalSize -= existing.size; + cache.delete(runId); + } + const size = estimateSize(value.events); + cache.set(runId, { value, size }); + totalSize += size; + evictIfNeeded(); +} + +/** + * Removes the cached entry for a run. Optional; LRU eviction handles + * memory bounds passively. Use this to free memory eagerly when a run + * reaches a terminal state and will not resume again. + */ +export function deleteCachedRunEvents(runId: string): void { + const existing = cache.get(runId); + if (existing) { + totalSize -= existing.size; + cache.delete(runId); + } +} + +/** + * Clears the entire cache. Intended for tests; production code should + * not need this. + */ +export function clearEventCache(): void { + cache.clear(); + totalSize = 0; + evictionCount = 0; +} + +/** + * Returns cache statistics. Intended for tests and observability. + */ +export function getEventCacheStats(): { + entryCount: number; + totalSize: number; + evictionCount: number; + maxSizeBytes: number; + maxEntries: number; +} { + return { + entryCount: cache.size, + totalSize, + evictionCount, + maxSizeBytes: MAX_SIZE_BYTES, + maxEntries: MAX_ENTRIES, + }; +} diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index 91bdd15056..bc83879400 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -331,7 +331,7 @@ function recordRequestedEventCursor( requestedCursors.add(cursor); } -function appendUniqueEvents( +export function appendUniqueEvents( target: Event[], targetIds: Set, events: Event[] diff --git a/packages/core/src/runtime/wait-completion-replay.test.ts b/packages/core/src/runtime/wait-completion-replay.test.ts index 5d059703bc..59767d65b3 100644 --- a/packages/core/src/runtime/wait-completion-replay.test.ts +++ b/packages/core/src/runtime/wait-completion-replay.test.ts @@ -14,6 +14,7 @@ import { dehydrateWorkflowArguments, } from '../serialization.js'; import { createContext } from '../vm/index.js'; +import { clearEventCache } from './event-cache.js'; import { setWorld } from './world.js'; vi.mock('@vercel/functions', () => ({ @@ -343,6 +344,7 @@ function expectHookBranchQueued( describe('workflow handler wait completion replay', () => { afterEach(() => { setWorld(undefined); + clearEventCache(); vi.restoreAllMocks(); });