Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
80545bb
fix(core): don't flag wait_completed.resumeAt mismatch without a reco…
TooTallNate May 30, 2026
be32d32
chore(core): add changeset; drop timing race in resumeAt replay test
VaguelySerious May 31, 2026
4cc9b43
Merge branch 'stable' into nate/fix-reused-sleep-resumeat-replay
VaguelySerious May 31, 2026
f00d6da
fix(core): always flag an invalid wait_completed.resumeAt
VaguelySerious May 31, 2026
4dd6bec
Merge branch 'stable' into nate/fix-reused-sleep-resumeat-replay
VaguelySerious May 31, 2026
ed8f9d7
Merge branch 'stable' into nate/fix-reused-sleep-resumeat-replay
VaguelySerious Jun 1, 2026
d392c7e
[e2e] Improve error labeling in event-log-race-repro CI job (#2190)
VaguelySerious Jun 1, 2026
8f41186
[e2e] Make event-log-race-repro summary actionable: full regression l…
VaguelySerious Jun 1, 2026
44bcea5
[world-vercel] [core] Add option to limit queue concurrency to one
VaguelySerious Jun 1, 2026
68a058d
TEMP: force strict concurrency on for CI (revert before merge)
VaguelySerious Jun 1, 2026
dee4370
[e2e] Capture where a stuck repro run wedged
VaguelySerious Jun 1, 2026
2864608
Merge remote-tracking branch 'origin/stable' into nate/fix-reused-sle…
VaguelySerious Jun 1, 2026
31d5b99
[e2e] Treat slow-but-completed repro runs as non-gating, not stuck
VaguelySerious Jun 1, 2026
a9b68c0
[e2e] Retry transient fetch failures in the repro harness; label wher…
VaguelySerious Jun 1, 2026
012234b
[e2e] Make event-log-race-repro summary actionable: full regression l…
VaguelySerious Jun 1, 2026
32adc82
[e2e] Capture where a stuck repro run wedged
VaguelySerious Jun 1, 2026
b99c748
[e2e] Treat slow-but-completed repro runs as non-gating, not stuck
VaguelySerious Jun 1, 2026
9f465e5
[e2e] Retry transient fetch failures in the repro harness; label wher…
VaguelySerious Jun 1, 2026
49b3057
Merge branch 'stable' into nate/fix-reused-sleep-resumeat-replay
VaguelySerious Jun 1, 2026
b5ae489
[e2e] Classify polled run failures from the structured error, not a m…
VaguelySerious Jun 1, 2026
5fe57cc
[e2e] Classify polled run failures from the structured error, not a m…
VaguelySerious Jun 1, 2026
f126a99
Merge remote-tracking branch 'origin/stable' into peter/enforce-stric…
VaguelySerious Jun 1, 2026
72540fe
Merge remote-tracking branch 'origin/nate/fix-reused-sleep-resumeat-r…
VaguelySerious Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/enforce-strict-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world-vercel": minor
"@workflow/builders": minor
"@workflow/next": minor
"@workflow/sveltekit": patch
---

Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected.
5 changes: 5 additions & 0 deletions .changeset/sleep-resumeat-no-recorded-value.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Fix a spurious `CorruptedEventLogError` on replay when a duration-based `sleep()`'s `wait_completed` is validated without a recorded `wait_created` value.
12 changes: 12 additions & 0 deletions docs/content/docs/deploying/world/vercel-world.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ Vercel team ID for API requests. Automatically detected.

Custom base URL for the Vercel workflow API. Automatically detected.

### `ENFORCE_STRICT_CONCURRENCY`

Set to `1` to guarantee that **at most one orchestrator (flow) invocation runs at a time per workflow run**. By default, the runtime relies on idempotency and the event log to tolerate concurrent flow invocations of the same run; enabling this opt-in mode adds a hard concurrency limit on top.

When enabled, each run is given its own queue topic and the flow route is configured with `maxConcurrency: 1`, so [Vercel Queue](https://vercel.com/docs/queues) processes flow messages for a given run strictly one at a time. Step routes are unaffected and continue to run with full concurrency.

<Callout type="warn">
This variable is read at **both build time and runtime**, so it must be set as a project-level environment variable that applies to your build and your deployed functions. Setting it for only one will produce an inconsistent configuration.

Enabling strict concurrency creates one queue topic per run, which increases the number of distinct queues surfaced in queue observability. Leave it off unless you specifically need the per-run serialization guarantee.
</Callout>

### Programmatic configuration

{/*@skip-typecheck: incomplete code sample*/}
Expand Down
1 change: 1 addition & 0 deletions packages/builders/src/base-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ export const OPTIONS = handler;`;
topic: string;
consumer: string;
maxDeliveries?: number;
maxConcurrency?: number;
retryAfterSeconds?: number;
initialDelaySeconds?: number;
}>;
Expand Down
50 changes: 50 additions & 0 deletions packages/builders/src/constants.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

describe('getWorkflowQueueTrigger', () => {
let originalStrict: string | undefined;

beforeEach(() => {
originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY;
});

afterEach(() => {
if (originalStrict !== undefined) {
process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict;
} else {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
}
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('omits maxConcurrency by default', () => {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
const trigger = getWorkflowQueueTrigger();
expect(trigger.topic).toBe('__wkf_workflow_*');
expect('maxConcurrency' in trigger).toBe(false);
});

it('sets maxConcurrency: 1 when ENFORCE_STRICT_CONCURRENCY=1', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
const trigger = getWorkflowQueueTrigger();
expect(trigger).toMatchObject({
topic: '__wkf_workflow_*',
maxConcurrency: 1,
});
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('does not set maxConcurrency for non-"1" values', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = 'true';
const trigger = getWorkflowQueueTrigger();
expect('maxConcurrency' in trigger).toBe(false);
});

it('never applies concurrency to the step trigger', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
expect('maxConcurrency' in STEP_QUEUE_TRIGGER).toBe(false);
});
});
23 changes: 23 additions & 0 deletions packages/builders/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@ export const WORKFLOW_QUEUE_TRIGGER = {
retryAfterSeconds: 5, // Delay between retries (default: 60)
initialDelaySeconds: 0, // Initial delay before first delivery (default: 0)
};

/**
* Returns the queue trigger configuration for workflow (flow) routes.
*
* When `ENFORCE_STRICT_CONCURRENCY` is enabled, sets `maxConcurrency: 1` so
* VQS processes at most one flow invocation per concrete topic at a time.
* Paired with the per-run physical topic naming in `@workflow/world-vercel`
* (which appends the run id to the flow topic), this enforces at most one
* orchestrator invocation per run. Step routes are intentionally excluded.
*
* Must be read at build time, where the env var gates what is written into
* the route's `experimentalTriggers` config.
*/
export function getWorkflowQueueTrigger() {
// TEMP(ci-default-on): force strict concurrency ON regardless of the env var so
// CI exercises maxConcurrency across all e2e jobs. REVERT BEFORE MERGE — restore
// the `...(process.env.ENFORCE_STRICT_CONCURRENCY === '1' && { maxConcurrency: 1 })`
// gate.
return {
...WORKFLOW_QUEUE_TRIGGER,
maxConcurrency: 1,
};
}
6 changes: 5 additions & 1 deletion packages/builders/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ export {
getDecoratorOptionsForDirectory,
getDecoratorOptionsForDirectoryWithConfigPath,
} from './config-helpers.js';
export { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
export {
getWorkflowQueueTrigger,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
} from './constants.js';
export { createDiscoverEntriesPlugin } from './discover-entries-esbuild-plugin.js';
export {
clearModuleSpecifierCache,
Expand Down
4 changes: 2 additions & 2 deletions packages/builders/src/vercel-build-output-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { copyFile, mkdir, writeFile } from 'node:fs/promises';
import { join, resolve } from 'node:path';
import { BaseBuilder } from './base-builder.js';
import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

export class VercelBuildOutputAPIBuilder extends BaseBuilder {
async build(): Promise<void> {
Expand Down Expand Up @@ -114,7 +114,7 @@ export class VercelBuildOutputAPIBuilder extends BaseBuilder {
await this.createPackageJson(workflowsFuncDir, 'commonjs');
await this.createVcConfig(workflowsFuncDir, {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
runtime: this.config.runtime,
});

Expand Down
89 changes: 46 additions & 43 deletions packages/core/e2e/event-log-race-repro.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ async function describeStuckRun(
// settled yet. `completed` past the poll budget is downgraded to a non-gating
// SLOW_COMPLETION (slow, not wedged); `failed`/`cancelled` keep their meaning.
function classifyTerminalRun(
runData: { status: string; errorCode?: string },
// A failed WorkflowRun carries its reason in `error: { code, message }` — the
// run has no top-level `errorCode`. Reading the structured error is what lets
// us classify USER_ERROR/RUNTIME_ERROR/CORRUPTED_EVENT_LOG (vs. uncategorised
// `other`) and surface *why* it failed in the summary.
runData: { status: string; error?: { code?: string; message?: string } },
context: {
runId: string;
scenario: Scenario;
Expand Down Expand Up @@ -450,8 +454,9 @@ function classifyTerminalRun(
if (runData.status === 'failed') {
return {
...base,
outcome: classifyFailure(runData.errorCode),
errorCode: runData.errorCode,
outcome: classifyFailure(runData.error?.code),
errorCode: runData.error?.code,
errorMessage: runData.error?.message,
};
}

Expand Down Expand Up @@ -1017,44 +1022,42 @@ describe('event log race repro', () => {
}
});

test(
'event log races do not corrupt, stall, or take stale branches',
{ timeout: testTimeoutMs },
async () => {
const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2);
const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2);
const results = [
...(await runScenario(
config.hookSleepAttempts,
config.concurrency,
runHookSleepAttempt
)),
...(await runScenario(
config.stepFanoutAttempts,
config.stepConcurrency,
runStepFanoutAttempt
)),
...(await runScenario(
stepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'step')
)),
...(await runScenario(
sleepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'sleep')
)),
];
writeResults(results);

// Only event-log regressions fail the job. `infra` outcomes are
// harness-side timing races (hook resume vs. sleep budget) and transport
// errors — they are recorded and surfaced in the summary, but do not
// gate, matching `--check` in the renderer script.
const regressions = results.filter(
(result) => result.outcome !== 'completed' && result.outcome !== 'infra'
);
expect(regressions).toEqual([]);
}
);
test('event log races do not corrupt, stall, or take stale branches', {
timeout: testTimeoutMs,
}, async () => {
const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2);
const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2);
const results = [
...(await runScenario(
config.hookSleepAttempts,
config.concurrency,
runHookSleepAttempt
)),
...(await runScenario(
config.stepFanoutAttempts,
config.stepConcurrency,
runStepFanoutAttempt
)),
...(await runScenario(
stepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'step')
)),
...(await runScenario(
sleepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'sleep')
)),
];
writeResults(results);

// Only event-log regressions fail the job. `infra` outcomes are
// harness-side timing races (hook resume vs. sleep budget) and transport
// errors — they are recorded and surfaced in the summary, but do not
// gate, matching `--check` in the renderer script.
const regressions = results.filter(
(result) => result.outcome !== 'completed' && result.outcome !== 'infra'
);
expect(regressions).toEqual([]);
});
});
102 changes: 98 additions & 4 deletions packages/core/src/workflow/sleep.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@ import type { WorkflowOrchestratorContext } from '../private.js';
import { createContext } from '../vm/index.js';
import { createSleep } from './sleep.js';

const DEFAULT_FIXED_TIMESTAMP = 1753481739458;

// Helper to setup context to simulate a workflow run
function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
function setupWorkflowContext(
events: Event[]
): WorkflowOrchestratorContext & { updateTimestamp: (ts: number) => void } {
const context = createContext({
seed: 'test',
fixedTimestamp: 1753481739458,
fixedTimestamp: DEFAULT_FIXED_TIMESTAMP,
});
const ulid = monotonicFactory(() => context.globalThis.Math.random());
const workflowStartedAt = context.globalThis.Date.now();
// The workflow's ULID seed (correlationIds) is derived from the original
// start time and is stable across replays, even if the wall clock advances.
const workflowStartedAt = DEFAULT_FIXED_TIMESTAMP;
const ctx: WorkflowOrchestratorContext = {
runId: 'wrun_test',
encryptionKey: undefined,
Expand All @@ -43,7 +49,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
pendingDeliveries: 0,
pendingDeliveryBarriers: new Map(),
};
return ctx;
return Object.assign(ctx, { updateTimestamp: context.updateTimestamp });
}

describe('createSleep', () => {
Expand Down Expand Up @@ -143,6 +149,94 @@ describe('createSleep', () => {
expect(workflowError?.message).toContain('wait_01K11TFZ62YS0YYFDQ3E8B9YCV');
});

it('does not flag wait_completed.resumeAt when no wait_created was applied and the replay clock advanced', async () => {
// Production replay divergence (reused-sleep): a `wait_completed` is
// consumed by a sleep consumer that never consumed a `wait_created`
// (hasCreatedEvent=false), so the queue item still holds the value freshly
// computed by parseDurationToDate(duration) = Date.now() + duration.
//
// Because `sleep(<number|string>)` resumeAt is wall-clock-relative and the
// VM clock advances to each event's createdAt during replay, that fresh
// value differs from the absolute resumeAt the ORIGINAL run recorded into
// the event. The recorded resumeAt is the source of truth; the consumer's
// recomputed value is non-deterministic and must NOT be treated as the
// expected value. Captured from production: hasCreatedEvent=false with an
// ~18-42s delta between the recomputed and recorded resumeAt.
const recordedResumeAt = new Date(DEFAULT_FIXED_TIMESTAMP + 5_000);

const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_123',
eventType: 'wait_completed',
correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { resumeAt: recordedResumeAt },
createdAt: new Date(DEFAULT_FIXED_TIMESTAMP + 5_100),
},
]);

// Simulate replay drift: the wall clock has advanced 30s by the time the
// workflow re-invokes sleep(5000), so parseDurationToDate computes a
// resumeAt 30s ahead of the recorded one (the correlationId ULID seed is
// unchanged — it derives from the original start time).
ctx.updateTimestamp(DEFAULT_FIXED_TIMESTAMP + 30_000);

const sleepError = withResolvers<Error>();
ctx.onWorkflowError = sleepError.resolve;

const sleep = createSleep(ctx);
const slept = sleep(5_000);

// The bug raises CorruptedEventLogError (and never resolves the sleep);
// the fix lets the sleep resolve with no error. Race the two terminal
// outcomes directly — no timing guard — so a regression surfaces as the
// error branch (or a hang caught by the test timeout), never a flaky race
// against a fixed grace period.
const outcome = await Promise.race([
sleepError.promise.then((err) => ({ kind: 'error' as const, err })),
slept.then(() => ({ kind: 'resolved' as const })),
]);

if (outcome.kind === 'error') {
throw new Error(
`Unexpected workflow error on consistent replay: ${outcome.err.message}`
);
}
expect(outcome.kind).toBe('resolved');
expect(ctx.invocationsQueue.size).toBe(0);
});

it('flags an invalid wait_completed.resumeAt even when no wait_created was applied', async () => {
// Counterpart to the test above: skipping the equality check without a
// recorded value must NOT also swallow a malformed resumeAt. A non-finite
// resumeAt is corrupt data regardless of `hasCreatedEvent` — the original
// run always records a valid parseDurationToDate(...) Date, so a consistent
// log never carries one. Here there is no `wait_created` (hasCreatedEvent
// stays false) yet the Invalid Date must still raise.
const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_123',
eventType: 'wait_completed',
correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { resumeAt: new Date(Number.NaN) },
createdAt: new Date(DEFAULT_FIXED_TIMESTAMP + 5_100),
},
]);

const errorReceived = withResolvers<Error>();
ctx.onWorkflowError = errorReceived.resolve;

const sleep = createSleep(ctx);
void sleep(5_000);

const workflowError = await errorReceived.promise;
expect(workflowError).toBeInstanceOf(CorruptedEventLogError);
expect(workflowError?.message).toContain('wait_completed');
expect(workflowError?.message).toContain('Invalid Date');
expect(workflowError?.message).toContain('wait_01K11TFZ62YS0YYFDQ3E8B9YCV');
});

it('should invoke workflow error handler when wait_completed resumeAt is invalid', async () => {
const ctx = setupWorkflowContext([
{
Expand Down
Loading
Loading