Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-local-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Reduce local sequential-step replay I/O with bounded recent-event and storage-directory caches.
44 changes: 44 additions & 0 deletions packages/world-local/src/fs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import {
import { z } from 'zod';
import {
assertSafeEntityId,
clearCreatedFilesCache,
ensureDir,
paginatedFileSystemQuery,
readFirstByte,
readJSONWithFallback,
resolveWithinBase,
taggedPath,
UnsafeEntityIdError,
ulidToDate,
writeExclusive,
writeJSON,
} from './fs.js';

Expand Down Expand Up @@ -113,6 +116,47 @@ describe('fs utilities', () => {
});
});

describe('ensureDir', () => {
it('does not repeat mkdir for a directory created by this process', async () => {
clearCreatedFilesCache();
const nestedDir = path.join(testDir, 'events');
const mkdirSpy = vi.spyOn(fs, 'mkdir');

await ensureDir(nestedDir);
await ensureDir(nestedDir);

expect(
mkdirSpy.mock.calls.filter(([dirPath]) => dirPath === nestedDir)
).toHaveLength(1);
});

it('recreates a cached directory removed before an atomic write', async () => {
clearCreatedFilesCache();
const eventsDir = path.join(testDir, 'events');
const firstPath = path.join(eventsDir, 'first.json');
const secondPath = path.join(eventsDir, 'second.json');

await writeJSON(firstPath, { value: 'first' });
await fs.rm(eventsDir, { recursive: true, force: true });
await writeJSON(secondPath, { value: 'second' });

expect(JSON.parse(await fs.readFile(secondPath, 'utf8'))).toEqual({
value: 'second',
});
});

it('recreates a cached directory removed before an exclusive write', async () => {
clearCreatedFilesCache();
const locksDir = path.join(testDir, '.locks');

expect(await writeExclusive(path.join(locksDir, 'first'), '')).toBe(true);
await fs.rm(locksDir, { recursive: true, force: true });
expect(await writeExclusive(path.join(locksDir, 'second'), '')).toBe(
true
);
});
});

describe('paginatedFileSystemQuery', () => {
// Simple getCreatedAt function that strips .json and tries to parse as ULID
const getCreatedAt = (filename: string): Date | null => {
Expand Down
62 changes: 52 additions & 10 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ async function withWindowsRetry<T>(
// In-memory cache of created files to avoid expensive fs.access() calls
// This is safe because we only write once per file path (no overwrites without explicit flag)
const createdFilesCache = new Set<string>();
// Writes repeatedly target a small fixed set of entity directories. Once one
// exists in this process, avoid another recursive mkdir syscall per event.
const createdDirectoriesCache = new Set<string>();

/**
* Clear the created files cache. Useful for testing or when files are deleted externally.
* Clear write-path caches. Useful for testing or when files are deleted externally.
*/
export function clearCreatedFilesCache(): void {
createdFilesCache.clear();
createdDirectoriesCache.clear();
}

export { ulidToDate } from '@workflow/world';
Expand Down Expand Up @@ -252,13 +256,38 @@ export async function listTaggedFilesByExtension(
}

export async function ensureDir(dirPath: string): Promise<void> {
const resolvedPath = path.resolve(dirPath);
if (createdDirectoriesCache.has(resolvedPath)) {
return;
}
try {
await fs.mkdir(dirPath, { recursive: true });
await fs.mkdir(resolvedPath, { recursive: true });
createdDirectoriesCache.add(resolvedPath);
} catch (_error) {
// Ignore if already exists
}
}

async function withEnsuredDirectory<T>(
dirPath: string,
operation: () => Promise<T>
): Promise<T> {
await ensureDir(dirPath);
try {
return await operation();
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}

// A dev server may outlive an external cleanup of its data directory.
// Forget the cached directory and retry once after recreating it.
createdDirectoriesCache.delete(path.resolve(dirPath));
await ensureDir(dirPath);
return operation();
}
}

interface WriteOptions {
overwrite?: boolean;
}
Expand Down Expand Up @@ -342,10 +371,11 @@ export async function write(
const tempPath = `${filePath}.tmp.${ulid()}`;
let tempFileCreated = false;
try {
await ensureDir(path.dirname(filePath));
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
});
// Track this file in cache so future writes know it exists
createdFilesCache.add(filePath);
} catch (error) {
Expand Down Expand Up @@ -405,10 +435,11 @@ export async function writeExclusive(
filePath: string,
data: string
): Promise<boolean> {
await ensureDir(path.dirname(filePath));
try {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
return await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
});
} catch (error: any) {
if (error.code === 'EEXIST') {
return false;
Expand Down Expand Up @@ -439,6 +470,12 @@ export async function listFilesByExtension(
interface PaginatedFileSystemQueryConfig<T> {
directory: string;
schema: z.ZodType<T>;
/**
* Optional immutable-item cache, keyed by absolute file path. Event files
* are append-only, so world-local can replay events it just persisted
* without rereading and reparsing them from disk.
*/
cachedItems?: ReadonlyMap<string, T>;
filePrefix?: string;
fileIdFilter?: (fileId: string) => boolean;
filter?: (item: T) => boolean;
Expand Down Expand Up @@ -474,6 +511,7 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const {
directory,
schema,
cachedItems,
filePrefix,
fileIdFilter,
filter,
Expand Down Expand Up @@ -544,7 +582,11 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const filePath = path.join(directory, `${fileId}.json`);
Comment thread
pranaygp marked this conversation as resolved.
Outdated
Comment thread
pranaygp marked this conversation as resolved.
Outdated
let item: T | null = null;
try {
item = await readJSON(filePath, schema);
const cachedItem = cachedItems?.get(filePath);
Comment thread
pranaygp marked this conversation as resolved.
item =
cachedItem === undefined
? await readJSON(filePath, schema)
: structuredClone(cachedItem);
} catch (error: unknown) {
// We don't expect zod errors to happen, but if the JSON does get malformed,
// we skip the item. Preferably, we'd have a way to mark items as malformed,
Expand Down
9 changes: 8 additions & 1 deletion packages/world-local/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
const mergedConfig = { ...config.value, ...definedArgs };
const tag = mergedConfig.tag;
const queue = createQueue(mergedConfig);
const storage = createStorage(mergedConfig.dataDir, tag);
const { clearCache: clearStorageCache, ...storage } = createStorage(
mergedConfig.dataDir,
tag
);
const recoverActiveRuns = mergedConfig.recoverActiveRuns ?? true;
return {
specVersion: SPEC_VERSION_CURRENT,
Expand Down Expand Up @@ -94,9 +97,11 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
await reenqueueActiveRuns(recoveryRuns, queue.queue, 'world-local');
},
async close() {
clearStorageCache();
await queue.close();
},
async clear() {
clearStorageCache();
if (tag) {
// Selectively delete only files matching this tag
const basedir = mergedConfig.dataDir;
Expand Down Expand Up @@ -159,6 +164,8 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
// Clear the in-memory write cache so deleted paths are forgotten
clearCreatedFilesCache();
} else {
// `rm()` removes directories that the write path may have cached.
clearCreatedFilesCache();
await rm(mergedConfig.dataDir, { recursive: true, force: true });
await initDataDir(mergedConfig.dataDir);
}
Expand Down
105 changes: 104 additions & 1 deletion packages/world-local/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { WorkflowWorldError } from '@workflow/errors';
import type { Event, Storage } from '@workflow/world';
import { stripEventDataRefs } from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { writeJSON } from './fs.js';
import { hashToken } from './storage/helpers.js';
import { createStorage } from './storage.js';
Expand Down Expand Up @@ -1301,6 +1301,109 @@ describe('Storage', () => {
expect(result.data[3].eventType).toBe('hook_disposed');
});
});

Comment thread
pranaygp marked this conversation as resolved.
it('reuses locally appended events without exposing cached instances', async () => {
Comment thread
pranaygp marked this conversation as resolved.
const created = await storage.events.create(null, {
eventType: 'run_created',
eventData: {
deploymentId: 'deployment-cache',
workflowName: 'cached-event-workflow',
input: new Uint8Array([1]),
},
});
const runId = created.event.runId;
(created.event as any).eventData.input[0] = 9;
const readFileSpy = vi.spyOn(fs, 'readFile');

const first = await storage.events.list({ runId });
const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) =>
String(filePath).includes(`${path.sep}events${path.sep}`)
);
expect(eventFileReads).toHaveLength(0);
expect((first.data[0] as any).eventData.input).toEqual(
new Uint8Array([1])
);

(first.data[0] as { eventType: string }).eventType = 'run_failed';
const second = await storage.events.list({ runId });
expect(second.data[0]?.eventType).toBe('run_created');
});

it('reads oversized event payloads from disk instead of retaining them', async () => {
Comment thread
pranaygp marked this conversation as resolved.
const created = await storage.events.create(null, {
eventType: 'run_created',
eventData: {
deploymentId: 'deployment-large',
workflowName: 'large-event-workflow',
input: new Uint8Array(4 * 1024 * 1024),
},
});
const readFileSpy = vi.spyOn(fs, 'readFile');

await storage.events.list({ runId: created.event.runId });

const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) =>
String(filePath).includes(`${path.sep}events${path.sep}`)
);
expect(eventFileReads.length).toBeGreaterThan(0);
});

it('normalizes cached event metadata the same way as disk reads', async () => {
const created = await storage.events.create(null, {
eventType: 'run_created',
eventData: {
deploymentId: 'deployment-normalized',
workflowName: 'normalized-cache-workflow',
input: new Uint8Array([1]),
executionContext: {
timestamp: new Date('2026-01-01T00:00:00.000Z'),
},
},
});

const page = await storage.events.list({ runId: created.event.runId });

expect((page.data[0] as any).eventData.executionContext.timestamp).toBe(
'2026-01-01T00:00:00.000Z'
);
});

it('allows active-event cache contents to be explicitly released', async () => {
const localStorage = createStorage(testDir);
const run = await createRun(localStorage, {
deploymentId: 'deployment-clear',
workflowName: 'cleared-cache-workflow',
input: new Uint8Array([1]),
});
localStorage.clearCache();
const readFileSpy = vi.spyOn(fs, 'readFile');

await localStorage.events.list({ runId: run.runId });

const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) =>
String(filePath).includes(`${path.sep}events${path.sep}`)
);
expect(eventFileReads.length).toBeGreaterThan(0);
});

it('releases locally cached events after a run completes', async () => {
const run = await createRun(storage, {
deploymentId: 'deployment-complete',
workflowName: 'completed-cache-workflow',
input: new Uint8Array([1]),
});
await updateRun(storage, run.runId, 'run_completed', {
output: new Uint8Array([2]),
});
const readFileSpy = vi.spyOn(fs, 'readFile');

await storage.events.list({ runId: run.runId });

const eventFileReads = readFileSpy.mock.calls.filter(([filePath]) =>
String(filePath).includes(`${path.sep}events${path.sep}`)
);
expect(eventFileReads.length).toBeGreaterThan(0);
});
});

describe('hooks', () => {
Expand Down
Loading
Loading