Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/enforce-strict-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world-vercel": minor
"@workflow/builders": minor
"@workflow/next": minor
"@workflow/sveltekit": patch
---

Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected.
Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var, which limits flow route concurrency to one in Vercel environments, at a small queue latency cost.

12 changes: 12 additions & 0 deletions docs/content/docs/deploying/world/vercel-world.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ Vercel team ID for API requests. Automatically detected.

Custom base URL for the Vercel workflow API. Automatically detected.

### `ENFORCE_STRICT_CONCURRENCY`

Set to `1` to guarantee that **at most one orchestrator (flow) invocation runs at a time per workflow run**. By default, the runtime relies on idempotency and the event log to tolerate concurrent flow invocations of the same run; enabling this opt-in mode adds a hard concurrency limit on top.

When enabled, each run is given its own queue topic and the flow route is configured with `maxConcurrency: 1`, so [Vercel Queue](https://vercel.com/docs/queues) processes flow messages for a given run strictly one at a time. Step routes are unaffected and continue to run with full concurrency.

<Callout type="warn">
This variable is read at **both build time and runtime**, so it must be set as a project-level environment variable that applies to your build and your deployed functions. Setting it for only one will produce an inconsistent configuration.

Enabling strict concurrency creates one queue topic per run, which increases the number of distinct queues surfaced in queue observability. Leave it off unless you specifically need the per-run serialization guarantee.
</Callout>

### Programmatic configuration

{/*@skip-typecheck: incomplete code sample*/}
Expand Down
1 change: 1 addition & 0 deletions packages/builders/src/base-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ export const OPTIONS = handler;`;
topic: string;
consumer: string;
maxDeliveries?: number;
maxConcurrency?: number;
retryAfterSeconds?: number;
initialDelaySeconds?: number;
}>;
Expand Down
50 changes: 50 additions & 0 deletions packages/builders/src/constants.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

describe('getWorkflowQueueTrigger', () => {
let originalStrict: string | undefined;

beforeEach(() => {
originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY;
});

afterEach(() => {
if (originalStrict !== undefined) {
process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict;
} else {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
}
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('omits maxConcurrency by default', () => {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
const trigger = getWorkflowQueueTrigger();
expect(trigger.topic).toBe('__wkf_workflow_*');
expect('maxConcurrency' in trigger).toBe(false);
});

it('sets maxConcurrency: 1 when ENFORCE_STRICT_CONCURRENCY=1', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
const trigger = getWorkflowQueueTrigger();
expect(trigger).toMatchObject({
topic: '__wkf_workflow_*',
maxConcurrency: 1,
});
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('does not set maxConcurrency for non-"1" values', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = 'true';
const trigger = getWorkflowQueueTrigger();
expect('maxConcurrency' in trigger).toBe(false);
});

it('never applies concurrency to the step trigger', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
expect('maxConcurrency' in STEP_QUEUE_TRIGGER).toBe(false);
});
});
23 changes: 23 additions & 0 deletions packages/builders/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@ export const WORKFLOW_QUEUE_TRIGGER = {
retryAfterSeconds: 5, // Delay between retries (default: 60)
initialDelaySeconds: 0, // Initial delay before first delivery (default: 0)
};

/**
* Returns the queue trigger configuration for workflow (flow) routes.
*
* When `ENFORCE_STRICT_CONCURRENCY` is enabled, sets `maxConcurrency: 1` so
* VQS processes at most one flow invocation per concrete topic at a time.
* Paired with the per-run physical topic naming in `@workflow/world-vercel`
* (which appends the run id to the flow topic), this enforces at most one
* orchestrator invocation per run. Step routes are intentionally excluded.
*
* Must be read at build time, where the env var gates what is written into
* the route's `experimentalTriggers` config.
*/
export function getWorkflowQueueTrigger() {
// TEMP(ci-default-on): force strict concurrency ON regardless of the env var so
// CI exercises maxConcurrency across all e2e jobs. REVERT BEFORE MERGE — restore
// the `...(process.env.ENFORCE_STRICT_CONCURRENCY === '1' && { maxConcurrency: 1 })`
// gate.
return {
...WORKFLOW_QUEUE_TRIGGER,
maxConcurrency: 1,
};
}
6 changes: 5 additions & 1 deletion packages/builders/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ export {
getDecoratorOptionsForDirectory,
getDecoratorOptionsForDirectoryWithConfigPath,
} from './config-helpers.js';
export { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
export {
getWorkflowQueueTrigger,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
} from './constants.js';
export { createDiscoverEntriesPlugin } from './discover-entries-esbuild-plugin.js';
export {
clearModuleSpecifierCache,
Expand Down
4 changes: 2 additions & 2 deletions packages/builders/src/vercel-build-output-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { copyFile, mkdir, writeFile } from 'node:fs/promises';
import { join, resolve } from 'node:path';
import { BaseBuilder } from './base-builder.js';
import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

export class VercelBuildOutputAPIBuilder extends BaseBuilder {
async build(): Promise<void> {
Expand Down Expand Up @@ -114,7 +114,7 @@ export class VercelBuildOutputAPIBuilder extends BaseBuilder {
await this.createPackageJson(workflowsFuncDir, 'commonjs');
await this.createVcConfig(workflowsFuncDir, {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
runtime: this.config.runtime,
});

Expand Down
89 changes: 46 additions & 43 deletions packages/core/e2e/event-log-race-repro.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ async function describeStuckRun(
// settled yet. `completed` past the poll budget is downgraded to a non-gating
// SLOW_COMPLETION (slow, not wedged); `failed`/`cancelled` keep their meaning.
function classifyTerminalRun(
runData: { status: string; errorCode?: string },
// A failed WorkflowRun carries its reason in `error: { code, message }` — the
// run has no top-level `errorCode`. Reading the structured error is what lets
// us classify USER_ERROR/RUNTIME_ERROR/CORRUPTED_EVENT_LOG (vs. uncategorised
// `other`) and surface *why* it failed in the summary.
runData: { status: string; error?: { code?: string; message?: string } },
context: {
runId: string;
scenario: Scenario;
Expand Down Expand Up @@ -450,8 +454,9 @@ function classifyTerminalRun(
if (runData.status === 'failed') {
return {
...base,
outcome: classifyFailure(runData.errorCode),
errorCode: runData.errorCode,
outcome: classifyFailure(runData.error?.code),
errorCode: runData.error?.code,
errorMessage: runData.error?.message,
};
}

Expand Down Expand Up @@ -1017,44 +1022,42 @@ describe('event log race repro', () => {
}
});

test(
'event log races do not corrupt, stall, or take stale branches',
{ timeout: testTimeoutMs },
async () => {
const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2);
const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2);
const results = [
...(await runScenario(
config.hookSleepAttempts,
config.concurrency,
runHookSleepAttempt
)),
...(await runScenario(
config.stepFanoutAttempts,
config.stepConcurrency,
runStepFanoutAttempt
)),
...(await runScenario(
stepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'step')
)),
...(await runScenario(
sleepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'sleep')
)),
];
writeResults(results);

// Only event-log regressions fail the job. `infra` outcomes are
// harness-side timing races (hook resume vs. sleep budget) and transport
// errors — they are recorded and surfaced in the summary, but do not
// gate, matching `--check` in the renderer script.
const regressions = results.filter(
(result) => result.outcome !== 'completed' && result.outcome !== 'infra'
);
expect(regressions).toEqual([]);
}
);
test('event log races do not corrupt, stall, or take stale branches', {
timeout: testTimeoutMs,
}, async () => {
const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2);
const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2);
const results = [
...(await runScenario(
config.hookSleepAttempts,
config.concurrency,
runHookSleepAttempt
)),
...(await runScenario(
config.stepFanoutAttempts,
config.stepConcurrency,
runStepFanoutAttempt
)),
...(await runScenario(
stepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'step')
)),
...(await runScenario(
sleepBiasedAttempts,
config.stepConcurrency,
(attempt) => runStepSleepRaceAttempt(attempt, 'sleep')
)),
];
writeResults(results);

// Only event-log regressions fail the job. `infra` outcomes are
// harness-side timing races (hook resume vs. sleep budget) and transport
// errors — they are recorded and surfaced in the summary, but do not
// gate, matching `--check` in the renderer script.
const regressions = results.filter(
(result) => result.outcome !== 'completed' && result.outcome !== 'infra'
);
expect(regressions).toEqual([]);
});
});
4 changes: 2 additions & 2 deletions packages/next/src/builder-deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export async function getNextBuilderDeferred() {
const {
BaseBuilder: BaseBuilderClass,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
getWorkflowQueueTrigger,
applySwcTransform,
detectWorkflowPatterns,
getImportPath,
Expand Down Expand Up @@ -1117,7 +1117,7 @@ export async function getNextBuilderDeferred() {
},
workflows: {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
};

Expand Down
4 changes: 2 additions & 2 deletions packages/next/src/builder-eager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function getNextBuilderEager() {
const {
BaseBuilder: BaseBuilderClass,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
getWorkflowQueueTrigger,
// biome-ignore lint/security/noGlobalEval: Need to use eval here to avoid TypeScript from transpiling the import statement into `require()`
} = (await eval(
'import("@workflow/builders")'
Expand Down Expand Up @@ -435,7 +435,7 @@ export async function getNextBuilderEager() {
},
workflows: {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
};

Expand Down
11 changes: 2 additions & 9 deletions packages/sveltekit/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import path from 'node:path';
import { getWorkflowQueueTrigger } from '@workflow/builders';
import fs from 'fs-extra';

import { SvelteKitBuilder } from './builder.js';
Expand All @@ -21,15 +22,7 @@ process.on('beforeExit', () => {
file: '.vercel/output/functions/.well-known/workflow/v1/flow.func/.vc-config.json',
config: {
maxDuration: 'max',
experimentalTriggers: [
{
type: 'queue/v2beta',
topic: '__wkf_workflow_*',
consumer: 'default',
retryAfterSeconds: 5,
initialDelaySeconds: 0,
},
],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
},
{
Expand Down
Loading
Loading