diff --git a/.changeset/enforce-strict-concurrency.md b/.changeset/enforce-strict-concurrency.md new file mode 100644 index 0000000000..12810f85ae --- /dev/null +++ b/.changeset/enforce-strict-concurrency.md @@ -0,0 +1,8 @@ +--- +"@workflow/world-vercel": minor +"@workflow/builders": minor +"@workflow/next": minor +"@workflow/sveltekit": patch +--- + +Add opt-in `WORKFLOW_ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected. diff --git a/docs/content/docs/deploying/world/vercel-world.mdx b/docs/content/docs/deploying/world/vercel-world.mdx index 87e5ddbe27..ab657fbc88 100644 --- a/docs/content/docs/deploying/world/vercel-world.mdx +++ b/docs/content/docs/deploying/world/vercel-world.mdx @@ -111,6 +111,18 @@ Vercel team ID for API requests. Automatically detected. Custom base URL for the Vercel workflow API. Automatically detected. +### `WORKFLOW_ENFORCE_STRICT_CONCURRENCY` + +Set `WORKFLOW_ENFORCE_STRICT_CONCURRENCY=1` to guarantee that **at most one orchestrator (flow) invocation runs at a time per workflow run**. This behavior is off by default; reproduction deployments must explicitly enable the variable before building and running the deployment. Without it, the runtime relies on idempotency and the event log to tolerate concurrent flow invocations of the same run. + +When enabled, each run is given its own queue topic and the flow route is configured with `maxConcurrency: 1`, so [Vercel Queue](https://vercel.com/docs/queues) processes flow messages for a given run strictly one at a time. Step routes are unaffected and continue to run with full concurrency. + + + This variable is read at **both build time and runtime**, so it must be set as a project-level environment variable that applies to your build and your deployed functions. Setting it for only one will produce an inconsistent configuration. + + Enabling strict concurrency creates one queue topic per run, which increases the number of distinct queues surfaced in queue observability. Leave it off unless you specifically need the per-run serialization guarantee. + + ### Programmatic configuration {/*@skip-typecheck: incomplete code sample*/} diff --git a/docs/content/docs/how-it-works/framework-integrations.mdx b/docs/content/docs/how-it-works/framework-integrations.mdx index 487538670c..a7347cae55 100644 --- a/docs/content/docs/how-it-works/framework-integrations.mdx +++ b/docs/content/docs/how-it-works/framework-integrations.mdx @@ -405,10 +405,13 @@ Two queue topics are created per deployment: | `step.func` | `__wkf_step_*` | Step execution (long-running, `maxDuration: max`) | | `flow.func` | `__wkf_workflow_*` | Workflow orchestration (`maxDuration: 60`) | -If you're building a framework integration that targets Vercel, you should write these triggers into the `.vc-config.json` for each generated function. The `STEP_QUEUE_TRIGGER` and `WORKFLOW_QUEUE_TRIGGER` constants are exported from `@workflow/builders` for this purpose: +If you're building a framework integration that targets Vercel, you should write these triggers into the `.vc-config.json` for each generated function. Use `getWorkflowQueueTrigger()` for flow functions so `WORKFLOW_ENFORCE_STRICT_CONCURRENCY=1` is reflected in the generated trigger configuration: ```typescript -import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from "@workflow/builders"; +import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from "@workflow/builders"; + +const flowTriggers = [getWorkflowQueueTrigger()]; +const stepTriggers = [STEP_QUEUE_TRIGGER]; ``` diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 8eaf0281b1..caa57c5898 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -1360,6 +1360,7 @@ export const OPTIONS = handler;`; topic: string; consumer: string; maxDeliveries?: number; + maxConcurrency?: number; retryAfterSeconds?: number; initialDelaySeconds?: number; }>; diff --git a/packages/builders/src/constants.test.ts b/packages/builders/src/constants.test.ts new file mode 100644 index 0000000000..a438af4499 --- /dev/null +++ b/packages/builders/src/constants.test.ts @@ -0,0 +1,46 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js'; + +describe('getWorkflowQueueTrigger', () => { + let originalStrict: string | undefined; + + beforeEach(() => { + originalStrict = process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + }); + + afterEach(() => { + if (originalStrict !== undefined) { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = originalStrict; + } else { + delete process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + } + }); + + it('omits maxConcurrency by default', () => { + delete process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + const trigger = getWorkflowQueueTrigger(); + expect(trigger.topic).toBe('__wkf_workflow_*'); + expect('maxConcurrency' in trigger).toBe(false); + }); + + it('sets maxConcurrency: 1 when WORKFLOW_ENFORCE_STRICT_CONCURRENCY=1', () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + const trigger = getWorkflowQueueTrigger(); + expect(trigger).toMatchObject({ + topic: '__wkf_workflow_*', + maxConcurrency: 1, + }); + }); + + it('does not set maxConcurrency for non-"1" values', () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = 'true'; + const trigger = getWorkflowQueueTrigger(); + expect('maxConcurrency' in trigger).toBe(false); + }); + + it('never applies concurrency to the step trigger', () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + expect('maxConcurrency' in STEP_QUEUE_TRIGGER).toBe(false); + }); +}); diff --git a/packages/builders/src/constants.ts b/packages/builders/src/constants.ts index 0eef6488f7..a838482fee 100644 --- a/packages/builders/src/constants.ts +++ b/packages/builders/src/constants.ts @@ -21,3 +21,25 @@ export const WORKFLOW_QUEUE_TRIGGER = { retryAfterSeconds: 5, // Delay between retries (default: 60) initialDelaySeconds: 0, // Initial delay before first delivery (default: 0) }; + +/** + * Returns the queue trigger configuration for workflow (flow) routes. + * + * When `WORKFLOW_ENFORCE_STRICT_CONCURRENCY` is enabled, sets + * `maxConcurrency: 1` so VQS processes at most one flow invocation per + * concrete topic at a time. Paired with the per-run physical topic naming in + * `@workflow/world-vercel` (which appends the run id to the flow topic), this + * enforces at most one orchestrator invocation per run. Step routes are + * intentionally excluded. + * + * Must be read at build time, where the env var gates what is written into + * the route's `experimentalTriggers` config. + */ +export function getWorkflowQueueTrigger() { + return { + ...WORKFLOW_QUEUE_TRIGGER, + ...(process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY === '1' && { + maxConcurrency: 1, + }), + }; +} diff --git a/packages/builders/src/index.ts b/packages/builders/src/index.ts index e0ea433227..add7908090 100644 --- a/packages/builders/src/index.ts +++ b/packages/builders/src/index.ts @@ -9,7 +9,11 @@ export { getDecoratorOptionsForDirectory, getDecoratorOptionsForDirectoryWithConfigPath, } from './config-helpers.js'; -export { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js'; +export { + getWorkflowQueueTrigger, + STEP_QUEUE_TRIGGER, + WORKFLOW_QUEUE_TRIGGER, +} from './constants.js'; export { createDiscoverEntriesPlugin } from './discover-entries-esbuild-plugin.js'; export { clearModuleSpecifierCache, diff --git a/packages/builders/src/vercel-build-output-api.ts b/packages/builders/src/vercel-build-output-api.ts index 30fc722b15..093cf49633 100644 --- a/packages/builders/src/vercel-build-output-api.ts +++ b/packages/builders/src/vercel-build-output-api.ts @@ -1,7 +1,7 @@ import { copyFile, mkdir, writeFile } from 'node:fs/promises'; import { join, resolve } from 'node:path'; import { BaseBuilder } from './base-builder.js'; -import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js'; +import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js'; export class VercelBuildOutputAPIBuilder extends BaseBuilder { async build(): Promise { @@ -114,7 +114,7 @@ export class VercelBuildOutputAPIBuilder extends BaseBuilder { await this.createPackageJson(workflowsFuncDir, 'commonjs'); await this.createVcConfig(workflowsFuncDir, { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], runtime: this.config.runtime, }); diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 63e1172bf9..01b30fa05f 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -417,7 +417,11 @@ async function describeStuckRun( // settled yet. `completed` past the poll budget is downgraded to a non-gating // SLOW_COMPLETION (slow, not wedged); `failed`/`cancelled` keep their meaning. function classifyTerminalRun( - runData: { status: string; errorCode?: string }, + // A failed WorkflowRun carries its reason in `error: { code, message }` — the + // run has no top-level `errorCode`. Reading the structured error is what lets + // us classify USER_ERROR/RUNTIME_ERROR/CORRUPTED_EVENT_LOG (vs. uncategorised + // `other`) and surface *why* it failed in the summary. + runData: { status: string; error?: { code?: string; message?: string } }, context: { runId: string; scenario: Scenario; @@ -450,8 +454,9 @@ function classifyTerminalRun( if (runData.status === 'failed') { return { ...base, - outcome: classifyFailure(runData.errorCode), - errorCode: runData.errorCode, + outcome: classifyFailure(runData.error?.code), + errorCode: runData.error?.code, + errorMessage: runData.error?.message, }; } @@ -1017,44 +1022,42 @@ describe('event log race repro', () => { } }); - test( - 'event log races do not corrupt, stall, or take stale branches', - { timeout: testTimeoutMs }, - async () => { - const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); - const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2); - const results = [ - ...(await runScenario( - config.hookSleepAttempts, - config.concurrency, - runHookSleepAttempt - )), - ...(await runScenario( - config.stepFanoutAttempts, - config.stepConcurrency, - runStepFanoutAttempt - )), - ...(await runScenario( - stepBiasedAttempts, - config.stepConcurrency, - (attempt) => runStepSleepRaceAttempt(attempt, 'step') - )), - ...(await runScenario( - sleepBiasedAttempts, - config.stepConcurrency, - (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') - )), - ]; - writeResults(results); - - // Only event-log regressions fail the job. `infra` outcomes are - // harness-side timing races (hook resume vs. sleep budget) and transport - // errors — they are recorded and surfaced in the summary, but do not - // gate, matching `--check` in the renderer script. - const regressions = results.filter( - (result) => result.outcome !== 'completed' && result.outcome !== 'infra' - ); - expect(regressions).toEqual([]); - } - ); + test('event log races do not corrupt, stall, or take stale branches', { + timeout: testTimeoutMs, + }, async () => { + const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); + const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2); + const results = [ + ...(await runScenario( + config.hookSleepAttempts, + config.concurrency, + runHookSleepAttempt + )), + ...(await runScenario( + config.stepFanoutAttempts, + config.stepConcurrency, + runStepFanoutAttempt + )), + ...(await runScenario( + stepBiasedAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'step') + )), + ...(await runScenario( + sleepBiasedAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') + )), + ]; + writeResults(results); + + // Only event-log regressions fail the job. `infra` outcomes are + // harness-side timing races (hook resume vs. sleep budget) and transport + // errors — they are recorded and surfaced in the summary, but do not + // gate, matching `--check` in the renderer script. + const regressions = results.filter( + (result) => result.outcome !== 'completed' && result.outcome !== 'infra' + ); + expect(regressions).toEqual([]); + }); }); diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index 5fc8c35a76..7308fd8b04 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -55,7 +55,7 @@ export async function getNextBuilderDeferred() { const { BaseBuilder: BaseBuilderClass, STEP_QUEUE_TRIGGER, - WORKFLOW_QUEUE_TRIGGER, + getWorkflowQueueTrigger, applySwcTransform, detectWorkflowPatterns, getImportPath, @@ -1117,7 +1117,7 @@ export async function getNextBuilderDeferred() { }, workflows: { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }; diff --git a/packages/next/src/builder-eager.ts b/packages/next/src/builder-eager.ts index 384dd19af8..9784720fe6 100644 --- a/packages/next/src/builder-eager.ts +++ b/packages/next/src/builder-eager.ts @@ -17,7 +17,7 @@ export async function getNextBuilderEager() { const { BaseBuilder: BaseBuilderClass, STEP_QUEUE_TRIGGER, - WORKFLOW_QUEUE_TRIGGER, + getWorkflowQueueTrigger, // biome-ignore lint/security/noGlobalEval: Need to use eval here to avoid TypeScript from transpiling the import statement into `require()` } = (await eval( 'import("@workflow/builders")' @@ -435,7 +435,7 @@ export async function getNextBuilderEager() { }, workflows: { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }; diff --git a/packages/sveltekit/src/index.ts b/packages/sveltekit/src/index.ts index 55ba9b6f70..12eab34b6d 100644 --- a/packages/sveltekit/src/index.ts +++ b/packages/sveltekit/src/index.ts @@ -1,4 +1,5 @@ import path from 'node:path'; +import { getWorkflowQueueTrigger } from '@workflow/builders'; import fs from 'fs-extra'; import { SvelteKitBuilder } from './builder.js'; @@ -21,15 +22,7 @@ process.on('beforeExit', () => { file: '.vercel/output/functions/.well-known/workflow/v1/flow.func/.vc-config.json', config: { maxDuration: 'max', - experimentalTriggers: [ - { - type: 'queue/v2beta', - topic: '__wkf_workflow_*', - consumer: 'default', - retryAfterSeconds: 5, - initialDelaySeconds: 0, - }, - ], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }, { diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b8b2490af8..f34264fa04 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -327,6 +327,108 @@ describe('createQueue', () => { }); }); + describe('strict concurrency (WORKFLOW_ENFORCE_STRICT_CONCURRENCY)', () => { + let originalDeploymentId: string | undefined; + let originalStrict: string | undefined; + + beforeEach(() => { + originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + originalStrict = process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + mockSend.mockResolvedValue({ messageId: 'msg-123' }); + }); + + afterEach(() => { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + if (originalStrict !== undefined) { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = originalStrict; + } else { + delete process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + } + }); + + it('appends runId to the physical flow topic while keeping the logical queueName', async () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' }); + + // send(physicalTopic, wrapper, options) + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc'); + // The logical queue name is preserved so the handler + re-enqueue path + // resolves the same per-run physical topic on the next invocation. + expect(mockSend.mock.calls[0][1].queueName).toBe('__wkf_workflow_test'); + }); + + it('re-enqueues delayed flow messages to the same per-run physical topic', async () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 300, + })); + + await capturedHandler!( + { + payload: { runId: 'wrun_abc' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { messageId: 'msg-123', deliveryCount: 1, createdAt: new Date() } + ); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc'); + }); + + it('does not rewrite the topic when the flag is unset', async () => { + delete process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test'); + }); + + it('does not rewrite step topics even when the flag is set', async () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_step_myStep', { + workflowName: 'test-workflow', + workflowRunId: 'wrun_abc', + workflowStartedAt: Date.now(), + stepId: 'step_xyz', + }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_step_myStep'); + }); + + it('does not rewrite health check topics (no runId) even when the flag is set', async () => { + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_health_check', { + __healthCheck: true as const, + correlationId: 'corr_123', + }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_health_check'); + }); + }); + describe('createQueueHandler()', () => { const setupHandler = ({ timeoutSeconds }: { timeoutSeconds: number }) => { let capturedHandler: ( diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 8fdab46579..e61a065d6f 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -175,6 +175,32 @@ function getHeadersFromPayload( return Object.keys(headers).length > 0 ? headers : undefined; } +/** + * Resolves the physical VQS topic for a message. + * + * Normally this is just the logical queue name. When + * `WORKFLOW_ENFORCE_STRICT_CONCURRENCY` is enabled, flow (workflow) messages + * get a per-run topic by appending the run id. VQS scopes `maxConcurrency` + * per concrete topic, so a per-run topic combined with `maxConcurrency: 1` + * on the flow trigger enforces at most one orchestrator invocation per run + * at a time. Step topics and health checks (which carry no `runId`) keep + * their shared topic. + */ +function getPhysicalQueueName( + queueName: ValidQueueName, + payload: QueuePayload +): string { + if ( + process.env.WORKFLOW_ENFORCE_STRICT_CONCURRENCY === '1' && + queueName.startsWith('__wkf_workflow_') && + 'runId' in payload && + typeof payload.runId === 'string' + ) { + return `${queueName}_${payload.runId}`; + } + return queueName; +} + type QueueFunction = ( queueName: ValidQueueName, payload: QueuePayload, @@ -236,11 +262,16 @@ export function createQueue(config?: APIConfig): Queue { // preserving Uint8Array values (workflow input in specVersion >= 2). const wrapper = { payload, + // Keep the logical queue name so the handler and re-enqueue path + // resolve the same per-run physical topic on the next invocation. queueName, // Store deploymentId in the message so it can be preserved when re-enqueueing deploymentId: opts?.deploymentId, }; - const sanitizedQueueName = queueName.replace(/[^A-Za-z0-9-_]/g, '-'); + const sanitizedQueueName = getPhysicalQueueName(queueName, payload).replace( + /[^A-Za-z0-9-_]/g, + '-' + ); try { const { messageId } = await client.send(sanitizedQueueName, wrapper, { idempotencyKey: opts?.idempotencyKey,