Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-local-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Reduce local sequential-step replay I/O with bounded recent-event and storage-directory caches.
44 changes: 44 additions & 0 deletions packages/world-local/src/fs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ import {
import { z } from 'zod';
import {
assertSafeEntityId,
clearCreatedFilesCache,
ensureDir,
paginatedFileSystemQuery,
readFirstByte,
readJSONWithFallback,
resolveWithinBase,
taggedPath,
UnsafeEntityIdError,
ulidToDate,
writeExclusive,
writeJSON,
} from './fs.js';

Expand Down Expand Up @@ -113,6 +116,47 @@ describe('fs utilities', () => {
});
});

describe('ensureDir', () => {
it('does not repeat mkdir for a directory created by this process', async () => {
clearCreatedFilesCache();
const nestedDir = path.join(testDir, 'events');
const mkdirSpy = vi.spyOn(fs, 'mkdir');

await ensureDir(nestedDir);
await ensureDir(nestedDir);

expect(
mkdirSpy.mock.calls.filter(([dirPath]) => dirPath === nestedDir)
).toHaveLength(1);
});

it('recreates a cached directory removed before an atomic write', async () => {
clearCreatedFilesCache();
const eventsDir = path.join(testDir, 'events');
const firstPath = path.join(eventsDir, 'first.json');
const secondPath = path.join(eventsDir, 'second.json');

await writeJSON(firstPath, { value: 'first' });
await fs.rm(eventsDir, { recursive: true, force: true });
await writeJSON(secondPath, { value: 'second' });

expect(JSON.parse(await fs.readFile(secondPath, 'utf8'))).toEqual({
value: 'second',
});
});

it('recreates a cached directory removed before an exclusive write', async () => {
clearCreatedFilesCache();
const locksDir = path.join(testDir, '.locks');

expect(await writeExclusive(path.join(locksDir, 'first'), '')).toBe(true);
await fs.rm(locksDir, { recursive: true, force: true });
expect(await writeExclusive(path.join(locksDir, 'second'), '')).toBe(
true
);
});
});

describe('paginatedFileSystemQuery', () => {
// Simple getCreatedAt function that strips .json and tries to parse as ULID
const getCreatedAt = (filename: string): Date | null => {
Expand Down
68 changes: 56 additions & 12 deletions packages/world-local/src/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ async function withWindowsRetry<T>(
// In-memory cache of created files to avoid expensive fs.access() calls
// This is safe because we only write once per file path (no overwrites without explicit flag)
const createdFilesCache = new Set<string>();
// Writes repeatedly target a small fixed set of entity directories. Once one
// exists in this process, avoid another recursive mkdir syscall per event.
const createdDirectoriesCache = new Set<string>();

/**
* Clear the created files cache. Useful for testing or when files are deleted externally.
* Clear write-path caches. Useful for testing or when files are deleted externally.
*/
export function clearCreatedFilesCache(): void {
createdFilesCache.clear();
createdDirectoriesCache.clear();
}

export { ulidToDate } from '@workflow/world';
Expand Down Expand Up @@ -252,13 +256,38 @@ export async function listTaggedFilesByExtension(
}

export async function ensureDir(dirPath: string): Promise<void> {
const resolvedPath = path.resolve(dirPath);
if (createdDirectoriesCache.has(resolvedPath)) {
return;
}
try {
await fs.mkdir(dirPath, { recursive: true });
await fs.mkdir(resolvedPath, { recursive: true });
createdDirectoriesCache.add(resolvedPath);
} catch (_error) {
// Ignore if already exists
}
}

async function withEnsuredDirectory<T>(
dirPath: string,
operation: () => Promise<T>
): Promise<T> {
await ensureDir(dirPath);
try {
return await operation();
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}

// A dev server may outlive an external cleanup of its data directory.
// Forget the cached directory and retry once after recreating it.
createdDirectoriesCache.delete(path.resolve(dirPath));
await ensureDir(dirPath);
return operation();
}
}

interface WriteOptions {
overwrite?: boolean;
}
Expand Down Expand Up @@ -342,10 +371,11 @@ export async function write(
const tempPath = `${filePath}.tmp.${ulid()}`;
let tempFileCreated = false;
try {
await ensureDir(path.dirname(filePath));
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(tempPath, data);
tempFileCreated = true;
await withWindowsRetry(() => fs.rename(tempPath, filePath));
});
// Track this file in cache so future writes know it exists
createdFilesCache.add(filePath);
} catch (error) {
Expand Down Expand Up @@ -405,10 +435,11 @@ export async function writeExclusive(
filePath: string,
data: string
): Promise<boolean> {
await ensureDir(path.dirname(filePath));
try {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
return await withEnsuredDirectory(path.dirname(filePath), async () => {
await fs.writeFile(filePath, data, { flag: 'wx' });
return true;
});
} catch (error: any) {
if (error.code === 'EEXIST') {
return false;
Expand Down Expand Up @@ -439,6 +470,12 @@ export async function listFilesByExtension(
interface PaginatedFileSystemQueryConfig<T> {
directory: string;
schema: z.ZodType<T>;
/**
* Optional immutable-item cache, keyed by absolute file path. Event files
* are append-only, so world-local can replay events it just persisted
* without rereading and reparsing them from disk.
*/
cachedItems?: ReadonlyMap<string, T>;
filePrefix?: string;
fileIdFilter?: (fileId: string) => boolean;
filter?: (item: T) => boolean;
Expand Down Expand Up @@ -474,6 +511,7 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const {
directory,
schema,
cachedItems,
filePrefix,
fileIdFilter,
filter,
Expand All @@ -493,8 +531,10 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
assertSafeEntityId('filePrefix', filePrefix);
}

const resolvedDirectory = path.resolve(directory);

// 1. Get all JSON files in directory
const fileIds = await listJSONFiles(directory);
const fileIds = await listJSONFiles(resolvedDirectory);

// 2. Filter by prefix if provided
const relevantFileIds = filePrefix
Expand Down Expand Up @@ -541,10 +581,14 @@ export async function paginatedFileSystemQuery<T extends { createdAt: Date }>(
const validItems: T[] = [];

for (const fileId of candidateFileIds) {
const filePath = path.join(directory, `${fileId}.json`);
const filePath = path.join(resolvedDirectory, `${fileId}.json`);
let item: T | null = null;
try {
item = await readJSON(filePath, schema);
const cachedItem = cachedItems?.get(filePath);
Comment thread
pranaygp marked this conversation as resolved.
item =
cachedItem === undefined
? await readJSON(filePath, schema)
: structuredClone(cachedItem);
} catch (error: unknown) {
// We don't expect zod errors to happen, but if the JSON does get malformed,
// we skip the item. Preferably, we'd have a way to mark items as malformed,
Expand Down
9 changes: 8 additions & 1 deletion packages/world-local/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
const mergedConfig = { ...config.value, ...definedArgs };
const tag = mergedConfig.tag;
const queue = createQueue(mergedConfig);
const storage = createStorage(mergedConfig.dataDir, tag);
const { clearCache: clearStorageCache, ...storage } = createStorage(
mergedConfig.dataDir,
tag
);
const recoverActiveRuns = mergedConfig.recoverActiveRuns ?? true;
return {
specVersion: SPEC_VERSION_CURRENT,
Expand Down Expand Up @@ -94,9 +97,11 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
await reenqueueActiveRuns(recoveryRuns, queue.queue, 'world-local');
},
async close() {
clearStorageCache();
await queue.close();
},
async clear() {
clearStorageCache();
if (tag) {
// Selectively delete only files matching this tag
const basedir = mergedConfig.dataDir;
Expand Down Expand Up @@ -159,6 +164,8 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
// Clear the in-memory write cache so deleted paths are forgotten
clearCreatedFilesCache();
} else {
// `rm()` removes directories that the write path may have cached.
clearCreatedFilesCache();
await rm(mergedConfig.dataDir, { recursive: true, force: true });
await initDataDir(mergedConfig.dataDir);
}
Expand Down
Loading
Loading