Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/enforce-strict-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@workflow/world-vercel": minor
"@workflow/builders": minor
"@workflow/next": minor
"@workflow/sveltekit": patch
---

Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected.
Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var, which limits flow route concurrency to one in Vercel environments, at a small queue latency cost.

12 changes: 12 additions & 0 deletions docs/content/docs/deploying/world/vercel-world.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ Vercel team ID for API requests. Automatically detected.

Custom base URL for the Vercel workflow API. Automatically detected.

### `ENFORCE_STRICT_CONCURRENCY`

Set to `1` to guarantee that **at most one orchestrator (flow) invocation runs at a time per workflow run**. By default, the runtime relies on idempotency and the event log to tolerate concurrent flow invocations of the same run; enabling this opt-in mode adds a hard concurrency limit on top.

When enabled, each run is given its own queue topic and the flow route is configured with `maxConcurrency: 1`, so [Vercel Queue](https://vercel.com/docs/queues) processes flow messages for a given run strictly one at a time. Step routes are unaffected and continue to run with full concurrency.

<Callout type="warn">
This variable is read at **both build time and runtime**, so it must be set as a project-level environment variable that applies to your build and your deployed functions. Setting it for only one will produce an inconsistent configuration.

Enabling strict concurrency creates one queue topic per run, which increases the number of distinct queues surfaced in queue observability. Leave it off unless you specifically need the per-run serialization guarantee.
</Callout>

### Programmatic configuration

{/*@skip-typecheck: incomplete code sample*/}
Expand Down
1 change: 1 addition & 0 deletions packages/builders/src/base-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ export const OPTIONS = handler;`;
topic: string;
consumer: string;
maxDeliveries?: number;
maxConcurrency?: number;
retryAfterSeconds?: number;
initialDelaySeconds?: number;
}>;
Expand Down
50 changes: 50 additions & 0 deletions packages/builders/src/constants.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

describe('getWorkflowQueueTrigger', () => {
let originalStrict: string | undefined;

beforeEach(() => {
originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY;
});

afterEach(() => {
if (originalStrict !== undefined) {
process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict;
} else {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
}
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('omits maxConcurrency by default', () => {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
const trigger = getWorkflowQueueTrigger();
expect(trigger.topic).toBe('__wkf_workflow_*');
expect('maxConcurrency' in trigger).toBe(false);
});

it('sets maxConcurrency: 1 when ENFORCE_STRICT_CONCURRENCY=1', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
const trigger = getWorkflowQueueTrigger();
expect(trigger).toMatchObject({
topic: '__wkf_workflow_*',
maxConcurrency: 1,
});
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('does not set maxConcurrency for non-"1" values', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = 'true';
const trigger = getWorkflowQueueTrigger();
expect('maxConcurrency' in trigger).toBe(false);
});

it('never applies concurrency to the step trigger', () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';
expect('maxConcurrency' in STEP_QUEUE_TRIGGER).toBe(false);
});
});
23 changes: 23 additions & 0 deletions packages/builders/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@ export const WORKFLOW_QUEUE_TRIGGER = {
retryAfterSeconds: 5, // Delay between retries (default: 60)
initialDelaySeconds: 0, // Initial delay before first delivery (default: 0)
};

/**
* Returns the queue trigger configuration for workflow (flow) routes.
*
* When `ENFORCE_STRICT_CONCURRENCY` is enabled, sets `maxConcurrency: 1` so
* VQS processes at most one flow invocation per concrete topic at a time.
* Paired with the per-run physical topic naming in `@workflow/world-vercel`
* (which appends the run id to the flow topic), this enforces at most one
* orchestrator invocation per run. Step routes are intentionally excluded.
*
* Must be read at build time, where the env var gates what is written into
* the route's `experimentalTriggers` config.
*/
export function getWorkflowQueueTrigger() {
// TEMP(ci-default-on): force strict concurrency ON regardless of the env var so
// CI exercises maxConcurrency across all e2e jobs. REVERT BEFORE MERGE — restore
// the `...(process.env.ENFORCE_STRICT_CONCURRENCY === '1' && { maxConcurrency: 1 })`
// gate.
return {
...WORKFLOW_QUEUE_TRIGGER,
maxConcurrency: 1,
};
}
6 changes: 5 additions & 1 deletion packages/builders/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ export {
getDecoratorOptionsForDirectory,
getDecoratorOptionsForDirectoryWithConfigPath,
} from './config-helpers.js';
export { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
export {
getWorkflowQueueTrigger,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
} from './constants.js';
export { createDiscoverEntriesPlugin } from './discover-entries-esbuild-plugin.js';
export {
clearModuleSpecifierCache,
Expand Down
4 changes: 2 additions & 2 deletions packages/builders/src/vercel-build-output-api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { copyFile, mkdir, writeFile } from 'node:fs/promises';
import { join, resolve } from 'node:path';
import { BaseBuilder } from './base-builder.js';
import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js';
import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js';

export class VercelBuildOutputAPIBuilder extends BaseBuilder {
async build(): Promise<void> {
Expand Down Expand Up @@ -114,7 +114,7 @@ export class VercelBuildOutputAPIBuilder extends BaseBuilder {
await this.createPackageJson(workflowsFuncDir, 'commonjs');
await this.createVcConfig(workflowsFuncDir, {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
runtime: this.config.runtime,
});

Expand Down
4 changes: 2 additions & 2 deletions packages/next/src/builder-deferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export async function getNextBuilderDeferred() {
const {
BaseBuilder: BaseBuilderClass,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
getWorkflowQueueTrigger,
applySwcTransform,
detectWorkflowPatterns,
getImportPath,
Expand Down Expand Up @@ -1117,7 +1117,7 @@ export async function getNextBuilderDeferred() {
},
workflows: {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
};

Expand Down
4 changes: 2 additions & 2 deletions packages/next/src/builder-eager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function getNextBuilderEager() {
const {
BaseBuilder: BaseBuilderClass,
STEP_QUEUE_TRIGGER,
WORKFLOW_QUEUE_TRIGGER,
getWorkflowQueueTrigger,
// biome-ignore lint/security/noGlobalEval: Need to use eval here to avoid TypeScript from transpiling the import statement into `require()`
} = (await eval(
'import("@workflow/builders")'
Expand Down Expand Up @@ -435,7 +435,7 @@ export async function getNextBuilderEager() {
},
workflows: {
maxDuration: 'max',
experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
};

Expand Down
11 changes: 2 additions & 9 deletions packages/sveltekit/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import path from 'node:path';
import { getWorkflowQueueTrigger } from '@workflow/builders';
import fs from 'fs-extra';

import { SvelteKitBuilder } from './builder.js';
Expand All @@ -21,15 +22,7 @@ process.on('beforeExit', () => {
file: '.vercel/output/functions/.well-known/workflow/v1/flow.func/.vc-config.json',
config: {
maxDuration: 'max',
experimentalTriggers: [
{
type: 'queue/v2beta',
topic: '__wkf_workflow_*',
consumer: 'default',
retryAfterSeconds: 5,
initialDelaySeconds: 0,
},
],
experimentalTriggers: [getWorkflowQueueTrigger()],
},
},
{
Expand Down
104 changes: 104 additions & 0 deletions packages/world-vercel/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,110 @@ describe('createQueue', () => {
});
});

describe('strict concurrency (ENFORCE_STRICT_CONCURRENCY)', () => {
let originalDeploymentId: string | undefined;
let originalStrict: string | undefined;

beforeEach(() => {
originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID;
originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY;
process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test';
mockSend.mockResolvedValue({ messageId: 'msg-123' });
});

afterEach(() => {
if (originalDeploymentId !== undefined) {
process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId;
} else {
delete process.env.VERCEL_DEPLOYMENT_ID;
}
if (originalStrict !== undefined) {
process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict;
} else {
delete process.env.ENFORCE_STRICT_CONCURRENCY;
}
});

it('appends runId to the physical flow topic while keeping the logical queueName', async () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';

const queue = createQueue();
await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' });

// send(physicalTopic, wrapper, options)
expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc');
// The logical queue name is preserved so the handler + re-enqueue path
// resolves the same per-run physical topic on the next invocation.
expect(mockSend.mock.calls[0][1].queueName).toBe('__wkf_workflow_test');
});

it('re-enqueues delayed flow messages to the same per-run physical topic', async () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';

let capturedHandler: (
message: unknown,
metadata: unknown
) => Promise<void>;
mockHandleCallback.mockImplementation((handler) => {
capturedHandler = handler;
return async () => new Response('ok');
});

const queue = createQueue();
queue.createQueueHandler('__wkf_workflow_', async () => ({
timeoutSeconds: 300,
}));

await capturedHandler!(
{
payload: { runId: 'wrun_abc' },
queueName: '__wkf_workflow_test',
deploymentId: 'dpl_original',
},
{ messageId: 'msg-123', deliveryCount: 1, createdAt: new Date() }
);

expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc');
});

// TEMP(ci-default-on): skipped while strict concurrency is forced on for CI.
// REVERT BEFORE MERGE (drop the TEMP commit to restore).
it.skip('does not rewrite the topic when the flag is unset', async () => {
delete process.env.ENFORCE_STRICT_CONCURRENCY;

const queue = createQueue();
await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' });

expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test');
});

it('does not rewrite step topics even when the flag is set', async () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';

const queue = createQueue();
await queue.queue('__wkf_step_myStep', {
workflowName: 'test-workflow',
workflowRunId: 'wrun_abc',
workflowStartedAt: Date.now(),
stepId: 'step_xyz',
});

expect(mockSend.mock.calls[0][0]).toBe('__wkf_step_myStep');
});

it('does not rewrite health check topics (no runId) even when the flag is set', async () => {
process.env.ENFORCE_STRICT_CONCURRENCY = '1';

const queue = createQueue();
await queue.queue('__wkf_workflow_health_check', {
__healthCheck: true as const,
correlationId: 'corr_123',
});

expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_health_check');
});
});

describe('createQueueHandler()', () => {
const setupHandler = ({ timeoutSeconds }: { timeoutSeconds: number }) => {
let capturedHandler: (
Expand Down
34 changes: 33 additions & 1 deletion packages/world-vercel/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ function getHeadersFromPayload(
return Object.keys(headers).length > 0 ? headers : undefined;
}

/**
* Resolves the physical VQS topic for a message.
*
* Normally this is just the logical queue name. When `ENFORCE_STRICT_CONCURRENCY`
* is enabled, flow (workflow) messages get a per-run topic by appending the
* run id. VQS scopes `maxConcurrency` per concrete topic, so a per-run topic
* combined with `maxConcurrency: 1` on the flow trigger enforces at most one
* orchestrator invocation per run at a time. Step topics and health checks
* (which carry no `runId`) keep their shared topic.
*/
function getPhysicalQueueName(
queueName: ValidQueueName,
payload: QueuePayload
): string {
// TEMP(ci-default-on): ignore ENFORCE_STRICT_CONCURRENCY so CI exercises the
// per-run topic across all e2e jobs. REVERT BEFORE MERGE — restore the
// `process.env.ENFORCE_STRICT_CONCURRENCY === '1' &&` condition below.
if (
queueName.startsWith('__wkf_workflow_') &&
'runId' in payload &&
typeof payload.runId === 'string'
) {
return `${queueName}_${payload.runId}`;
}
return queueName;
}

type QueueFunction = (
queueName: ValidQueueName,
payload: QueuePayload,
Expand Down Expand Up @@ -236,11 +263,16 @@ export function createQueue(config?: APIConfig): Queue {
// preserving Uint8Array values (workflow input in specVersion >= 2).
const wrapper = {
payload,
// Keep the logical queue name so the handler and re-enqueue path
// resolve the same per-run physical topic on the next invocation.
queueName,
// Store deploymentId in the message so it can be preserved when re-enqueueing
deploymentId: opts?.deploymentId,
};
const sanitizedQueueName = queueName.replace(/[^A-Za-z0-9-_]/g, '-');
const sanitizedQueueName = getPhysicalQueueName(queueName, payload).replace(
/[^A-Za-z0-9-_]/g,
'-'
);
try {
const { messageId } = await client.send(sanitizedQueueName, wrapper, {
idempotencyKey: opts?.idempotencyKey,
Expand Down
Loading