diff --git a/.changeset/enforce-strict-concurrency.md b/.changeset/enforce-strict-concurrency.md new file mode 100644 index 0000000000..58dcd10e82 --- /dev/null +++ b/.changeset/enforce-strict-concurrency.md @@ -0,0 +1,8 @@ +--- +"@workflow/world-vercel": minor +"@workflow/builders": minor +"@workflow/next": minor +"@workflow/sveltekit": patch +--- + +Add opt-in `ENFORCE_STRICT_CONCURRENCY` env var. When set to `1`, flow (orchestrator) routes are limited to one invocation per run at a time via a per-run queue topic and `maxConcurrency: 1` on the flow trigger. Step routes are unaffected. diff --git a/.changeset/sleep-resumeat-no-recorded-value.md b/.changeset/sleep-resumeat-no-recorded-value.md new file mode 100644 index 0000000000..02acb828c9 --- /dev/null +++ b/.changeset/sleep-resumeat-no-recorded-value.md @@ -0,0 +1,5 @@ +--- +'@workflow/core': patch +--- + +Fix a spurious `CorruptedEventLogError` on replay when a duration-based `sleep()`'s `wait_completed` is validated without a recorded `wait_created` value. diff --git a/docs/content/docs/deploying/world/vercel-world.mdx b/docs/content/docs/deploying/world/vercel-world.mdx index 87e5ddbe27..16fa264f45 100644 --- a/docs/content/docs/deploying/world/vercel-world.mdx +++ b/docs/content/docs/deploying/world/vercel-world.mdx @@ -111,6 +111,18 @@ Vercel team ID for API requests. Automatically detected. Custom base URL for the Vercel workflow API. Automatically detected. +### `ENFORCE_STRICT_CONCURRENCY` + +Set to `1` to guarantee that **at most one orchestrator (flow) invocation runs at a time per workflow run**. By default, the runtime relies on idempotency and the event log to tolerate concurrent flow invocations of the same run; enabling this opt-in mode adds a hard concurrency limit on top. + +When enabled, each run is given its own queue topic and the flow route is configured with `maxConcurrency: 1`, so [Vercel Queue](https://vercel.com/docs/queues) processes flow messages for a given run strictly one at a time. Step routes are unaffected and continue to run with full concurrency. + + + This variable is read at **both build time and runtime**, so it must be set as a project-level environment variable that applies to your build and your deployed functions. Setting it for only one will produce an inconsistent configuration. + + Enabling strict concurrency creates one queue topic per run, which increases the number of distinct queues surfaced in queue observability. Leave it off unless you specifically need the per-run serialization guarantee. + + ### Programmatic configuration {/*@skip-typecheck: incomplete code sample*/} diff --git a/packages/builders/src/base-builder.ts b/packages/builders/src/base-builder.ts index 8eaf0281b1..caa57c5898 100644 --- a/packages/builders/src/base-builder.ts +++ b/packages/builders/src/base-builder.ts @@ -1360,6 +1360,7 @@ export const OPTIONS = handler;`; topic: string; consumer: string; maxDeliveries?: number; + maxConcurrency?: number; retryAfterSeconds?: number; initialDelaySeconds?: number; }>; diff --git a/packages/builders/src/constants.test.ts b/packages/builders/src/constants.test.ts new file mode 100644 index 0000000000..3420bcb01b --- /dev/null +++ b/packages/builders/src/constants.test.ts @@ -0,0 +1,50 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js'; + +describe('getWorkflowQueueTrigger', () => { + let originalStrict: string | undefined; + + beforeEach(() => { + originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY; + }); + + afterEach(() => { + if (originalStrict !== undefined) { + process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict; + } else { + delete process.env.ENFORCE_STRICT_CONCURRENCY; + } + }); + + // TEMP(ci-default-on): skipped while strict concurrency is forced on for CI. + // REVERT BEFORE MERGE (drop the TEMP commit to restore). + it.skip('omits maxConcurrency by default', () => { + delete process.env.ENFORCE_STRICT_CONCURRENCY; + const trigger = getWorkflowQueueTrigger(); + expect(trigger.topic).toBe('__wkf_workflow_*'); + expect('maxConcurrency' in trigger).toBe(false); + }); + + it('sets maxConcurrency: 1 when ENFORCE_STRICT_CONCURRENCY=1', () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + const trigger = getWorkflowQueueTrigger(); + expect(trigger).toMatchObject({ + topic: '__wkf_workflow_*', + maxConcurrency: 1, + }); + }); + + // TEMP(ci-default-on): skipped while strict concurrency is forced on for CI. + // REVERT BEFORE MERGE (drop the TEMP commit to restore). + it.skip('does not set maxConcurrency for non-"1" values', () => { + process.env.ENFORCE_STRICT_CONCURRENCY = 'true'; + const trigger = getWorkflowQueueTrigger(); + expect('maxConcurrency' in trigger).toBe(false); + }); + + it('never applies concurrency to the step trigger', () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + expect('maxConcurrency' in STEP_QUEUE_TRIGGER).toBe(false); + }); +}); diff --git a/packages/builders/src/constants.ts b/packages/builders/src/constants.ts index 0eef6488f7..954cf9021b 100644 --- a/packages/builders/src/constants.ts +++ b/packages/builders/src/constants.ts @@ -21,3 +21,26 @@ export const WORKFLOW_QUEUE_TRIGGER = { retryAfterSeconds: 5, // Delay between retries (default: 60) initialDelaySeconds: 0, // Initial delay before first delivery (default: 0) }; + +/** + * Returns the queue trigger configuration for workflow (flow) routes. + * + * When `ENFORCE_STRICT_CONCURRENCY` is enabled, sets `maxConcurrency: 1` so + * VQS processes at most one flow invocation per concrete topic at a time. + * Paired with the per-run physical topic naming in `@workflow/world-vercel` + * (which appends the run id to the flow topic), this enforces at most one + * orchestrator invocation per run. Step routes are intentionally excluded. + * + * Must be read at build time, where the env var gates what is written into + * the route's `experimentalTriggers` config. + */ +export function getWorkflowQueueTrigger() { + // TEMP(ci-default-on): force strict concurrency ON regardless of the env var so + // CI exercises maxConcurrency across all e2e jobs. REVERT BEFORE MERGE — restore + // the `...(process.env.ENFORCE_STRICT_CONCURRENCY === '1' && { maxConcurrency: 1 })` + // gate. + return { + ...WORKFLOW_QUEUE_TRIGGER, + maxConcurrency: 1, + }; +} diff --git a/packages/builders/src/index.ts b/packages/builders/src/index.ts index e0ea433227..add7908090 100644 --- a/packages/builders/src/index.ts +++ b/packages/builders/src/index.ts @@ -9,7 +9,11 @@ export { getDecoratorOptionsForDirectory, getDecoratorOptionsForDirectoryWithConfigPath, } from './config-helpers.js'; -export { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js'; +export { + getWorkflowQueueTrigger, + STEP_QUEUE_TRIGGER, + WORKFLOW_QUEUE_TRIGGER, +} from './constants.js'; export { createDiscoverEntriesPlugin } from './discover-entries-esbuild-plugin.js'; export { clearModuleSpecifierCache, diff --git a/packages/builders/src/vercel-build-output-api.ts b/packages/builders/src/vercel-build-output-api.ts index 30fc722b15..093cf49633 100644 --- a/packages/builders/src/vercel-build-output-api.ts +++ b/packages/builders/src/vercel-build-output-api.ts @@ -1,7 +1,7 @@ import { copyFile, mkdir, writeFile } from 'node:fs/promises'; import { join, resolve } from 'node:path'; import { BaseBuilder } from './base-builder.js'; -import { STEP_QUEUE_TRIGGER, WORKFLOW_QUEUE_TRIGGER } from './constants.js'; +import { getWorkflowQueueTrigger, STEP_QUEUE_TRIGGER } from './constants.js'; export class VercelBuildOutputAPIBuilder extends BaseBuilder { async build(): Promise { @@ -114,7 +114,7 @@ export class VercelBuildOutputAPIBuilder extends BaseBuilder { await this.createPackageJson(workflowsFuncDir, 'commonjs'); await this.createVcConfig(workflowsFuncDir, { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], runtime: this.config.runtime, }); diff --git a/packages/core/e2e/event-log-race-repro.test.ts b/packages/core/e2e/event-log-race-repro.test.ts index 63e1172bf9..01b30fa05f 100644 --- a/packages/core/e2e/event-log-race-repro.test.ts +++ b/packages/core/e2e/event-log-race-repro.test.ts @@ -417,7 +417,11 @@ async function describeStuckRun( // settled yet. `completed` past the poll budget is downgraded to a non-gating // SLOW_COMPLETION (slow, not wedged); `failed`/`cancelled` keep their meaning. function classifyTerminalRun( - runData: { status: string; errorCode?: string }, + // A failed WorkflowRun carries its reason in `error: { code, message }` — the + // run has no top-level `errorCode`. Reading the structured error is what lets + // us classify USER_ERROR/RUNTIME_ERROR/CORRUPTED_EVENT_LOG (vs. uncategorised + // `other`) and surface *why* it failed in the summary. + runData: { status: string; error?: { code?: string; message?: string } }, context: { runId: string; scenario: Scenario; @@ -450,8 +454,9 @@ function classifyTerminalRun( if (runData.status === 'failed') { return { ...base, - outcome: classifyFailure(runData.errorCode), - errorCode: runData.errorCode, + outcome: classifyFailure(runData.error?.code), + errorCode: runData.error?.code, + errorMessage: runData.error?.message, }; } @@ -1017,44 +1022,42 @@ describe('event log race repro', () => { } }); - test( - 'event log races do not corrupt, stall, or take stale branches', - { timeout: testTimeoutMs }, - async () => { - const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); - const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2); - const results = [ - ...(await runScenario( - config.hookSleepAttempts, - config.concurrency, - runHookSleepAttempt - )), - ...(await runScenario( - config.stepFanoutAttempts, - config.stepConcurrency, - runStepFanoutAttempt - )), - ...(await runScenario( - stepBiasedAttempts, - config.stepConcurrency, - (attempt) => runStepSleepRaceAttempt(attempt, 'step') - )), - ...(await runScenario( - sleepBiasedAttempts, - config.stepConcurrency, - (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') - )), - ]; - writeResults(results); - - // Only event-log regressions fail the job. `infra` outcomes are - // harness-side timing races (hook resume vs. sleep budget) and transport - // errors — they are recorded and surfaced in the summary, but do not - // gate, matching `--check` in the renderer script. - const regressions = results.filter( - (result) => result.outcome !== 'completed' && result.outcome !== 'infra' - ); - expect(regressions).toEqual([]); - } - ); + test('event log races do not corrupt, stall, or take stale branches', { + timeout: testTimeoutMs, + }, async () => { + const stepBiasedAttempts = Math.ceil(config.stepSleepRaceAttempts / 2); + const sleepBiasedAttempts = Math.floor(config.stepSleepRaceAttempts / 2); + const results = [ + ...(await runScenario( + config.hookSleepAttempts, + config.concurrency, + runHookSleepAttempt + )), + ...(await runScenario( + config.stepFanoutAttempts, + config.stepConcurrency, + runStepFanoutAttempt + )), + ...(await runScenario( + stepBiasedAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'step') + )), + ...(await runScenario( + sleepBiasedAttempts, + config.stepConcurrency, + (attempt) => runStepSleepRaceAttempt(attempt, 'sleep') + )), + ]; + writeResults(results); + + // Only event-log regressions fail the job. `infra` outcomes are + // harness-side timing races (hook resume vs. sleep budget) and transport + // errors — they are recorded and surfaced in the summary, but do not + // gate, matching `--check` in the renderer script. + const regressions = results.filter( + (result) => result.outcome !== 'completed' && result.outcome !== 'infra' + ); + expect(regressions).toEqual([]); + }); }); diff --git a/packages/core/src/workflow/sleep.test.ts b/packages/core/src/workflow/sleep.test.ts index 90dbfa4465..61ee438da4 100644 --- a/packages/core/src/workflow/sleep.test.ts +++ b/packages/core/src/workflow/sleep.test.ts @@ -10,14 +10,20 @@ import type { WorkflowOrchestratorContext } from '../private.js'; import { createContext } from '../vm/index.js'; import { createSleep } from './sleep.js'; +const DEFAULT_FIXED_TIMESTAMP = 1753481739458; + // Helper to setup context to simulate a workflow run -function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { +function setupWorkflowContext( + events: Event[] +): WorkflowOrchestratorContext & { updateTimestamp: (ts: number) => void } { const context = createContext({ seed: 'test', - fixedTimestamp: 1753481739458, + fixedTimestamp: DEFAULT_FIXED_TIMESTAMP, }); const ulid = monotonicFactory(() => context.globalThis.Math.random()); - const workflowStartedAt = context.globalThis.Date.now(); + // The workflow's ULID seed (correlationIds) is derived from the original + // start time and is stable across replays, even if the wall clock advances. + const workflowStartedAt = DEFAULT_FIXED_TIMESTAMP; const ctx: WorkflowOrchestratorContext = { runId: 'wrun_test', encryptionKey: undefined, @@ -43,7 +49,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext { pendingDeliveries: 0, pendingDeliveryBarriers: new Map(), }; - return ctx; + return Object.assign(ctx, { updateTimestamp: context.updateTimestamp }); } describe('createSleep', () => { @@ -143,6 +149,94 @@ describe('createSleep', () => { expect(workflowError?.message).toContain('wait_01K11TFZ62YS0YYFDQ3E8B9YCV'); }); + it('does not flag wait_completed.resumeAt when no wait_created was applied and the replay clock advanced', async () => { + // Production replay divergence (reused-sleep): a `wait_completed` is + // consumed by a sleep consumer that never consumed a `wait_created` + // (hasCreatedEvent=false), so the queue item still holds the value freshly + // computed by parseDurationToDate(duration) = Date.now() + duration. + // + // Because `sleep()` resumeAt is wall-clock-relative and the + // VM clock advances to each event's createdAt during replay, that fresh + // value differs from the absolute resumeAt the ORIGINAL run recorded into + // the event. The recorded resumeAt is the source of truth; the consumer's + // recomputed value is non-deterministic and must NOT be treated as the + // expected value. Captured from production: hasCreatedEvent=false with an + // ~18-42s delta between the recomputed and recorded resumeAt. + const recordedResumeAt = new Date(DEFAULT_FIXED_TIMESTAMP + 5_000); + + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_123', + eventType: 'wait_completed', + correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { resumeAt: recordedResumeAt }, + createdAt: new Date(DEFAULT_FIXED_TIMESTAMP + 5_100), + }, + ]); + + // Simulate replay drift: the wall clock has advanced 30s by the time the + // workflow re-invokes sleep(5000), so parseDurationToDate computes a + // resumeAt 30s ahead of the recorded one (the correlationId ULID seed is + // unchanged — it derives from the original start time). + ctx.updateTimestamp(DEFAULT_FIXED_TIMESTAMP + 30_000); + + const sleepError = withResolvers(); + ctx.onWorkflowError = sleepError.resolve; + + const sleep = createSleep(ctx); + const slept = sleep(5_000); + + // The bug raises CorruptedEventLogError (and never resolves the sleep); + // the fix lets the sleep resolve with no error. Race the two terminal + // outcomes directly — no timing guard — so a regression surfaces as the + // error branch (or a hang caught by the test timeout), never a flaky race + // against a fixed grace period. + const outcome = await Promise.race([ + sleepError.promise.then((err) => ({ kind: 'error' as const, err })), + slept.then(() => ({ kind: 'resolved' as const })), + ]); + + if (outcome.kind === 'error') { + throw new Error( + `Unexpected workflow error on consistent replay: ${outcome.err.message}` + ); + } + expect(outcome.kind).toBe('resolved'); + expect(ctx.invocationsQueue.size).toBe(0); + }); + + it('flags an invalid wait_completed.resumeAt even when no wait_created was applied', async () => { + // Counterpart to the test above: skipping the equality check without a + // recorded value must NOT also swallow a malformed resumeAt. A non-finite + // resumeAt is corrupt data regardless of `hasCreatedEvent` — the original + // run always records a valid parseDurationToDate(...) Date, so a consistent + // log never carries one. Here there is no `wait_created` (hasCreatedEvent + // stays false) yet the Invalid Date must still raise. + const ctx = setupWorkflowContext([ + { + eventId: 'evnt_0', + runId: 'wrun_123', + eventType: 'wait_completed', + correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { resumeAt: new Date(Number.NaN) }, + createdAt: new Date(DEFAULT_FIXED_TIMESTAMP + 5_100), + }, + ]); + + const errorReceived = withResolvers(); + ctx.onWorkflowError = errorReceived.resolve; + + const sleep = createSleep(ctx); + void sleep(5_000); + + const workflowError = await errorReceived.promise; + expect(workflowError).toBeInstanceOf(CorruptedEventLogError); + expect(workflowError?.message).toContain('wait_completed'); + expect(workflowError?.message).toContain('Invalid Date'); + expect(workflowError?.message).toContain('wait_01K11TFZ62YS0YYFDQ3E8B9YCV'); + }); + it('should invoke workflow error handler when wait_completed resumeAt is invalid', async () => { const ctx = setupWorkflowContext([ { diff --git a/packages/core/src/workflow/sleep.ts b/packages/core/src/workflow/sleep.ts index 93c62216a0..f07221ff23 100644 --- a/packages/core/src/workflow/sleep.ts +++ b/packages/core/src/workflow/sleep.ts @@ -1,8 +1,13 @@ import { CorruptedEventLogError } from '@workflow/errors'; import { parseDurationToDate, withResolvers } from '@workflow/utils'; +import type { Event } from '@workflow/world'; import type { StringValue } from 'ms'; import { EventConsumerResult } from '../events-consumer.js'; -import { type WaitInvocationQueueItem, WorkflowSuspension } from '../global.js'; +import { + type QueueItem, + type WaitInvocationQueueItem, + WorkflowSuspension, +} from '../global.js'; import { awaitEarlierDeliveries, registerDeliveryBarrier, @@ -10,6 +15,70 @@ import { type WorkflowOrchestratorContext, } from '../private.js'; +/** + * Validates a `wait_completed` event's recorded `resumeAt` against the + * authoritative value for this wait, returning an error message string if the + * log is genuinely corrupted, or `null` otherwise. + * + * The authoritative `resumeAt` is the one recorded in the event log and applied + * to the queue item via `wait_created` (`hasCreatedEvent === true`). A + * duration-based `sleep()` otherwise derives `resumeAt` from + * `Date.now()` (see {@link parseDurationToDate}), which is wall-clock-relative + * and therefore NOT deterministic across replays: the original run computed + * `start + duration`, while a replay — whose VM clock has advanced to each + * event's `createdAt` — recomputes a different absolute timestamp. + * + * When this consumer never applied a `wait_created` (`hasCreatedEvent` falsy), + * the queue item still holds that freshly-recomputed value, so comparing it + * against the recorded `wait_completed.resumeAt` yields a false mismatch on a + * perfectly consistent log. The correlationId match already establishes the + * wait's identity, so the equality check is skipped in that case. + * + * A non-finite / unparseable `resumeAt`, however, is malformed irrespective of + * any authoritative value — the original run always records a valid + * `parseDurationToDate(...)` Date, so a consistent log never carries one. That + * is flagged unconditionally, before the `hasCreatedEvent` gate. + */ +function detectResumeAtMismatch( + correlationId: string, + event: Extract, + queueItem: QueueItem | undefined +): string | null { + const eventResumeAt = event.eventData?.resumeAt; + if (eventResumeAt === undefined) { + return null; + } + + const eventResumeAtDate = new Date(eventResumeAt); + const eventResumeAtMs = eventResumeAtDate.getTime(); + + // An Invalid/non-finite resumeAt is corrupt data regardless of whether an + // authoritative recorded value exists, so do not gate this on + // `hasCreatedEvent` — a consistent log never produces one. + if (!Number.isFinite(eventResumeAtMs)) { + return ( + `Corrupted event log: wait_completed event for ${correlationId} has ` + + `invalid resumeAt "${String(eventResumeAt)}"` + ); + } + + if (!queueItem || queueItem.type !== 'wait' || !queueItem.hasCreatedEvent) { + // No authoritative recorded resumeAt to compare a finite value against. + return null; + } + + const expectedResumeAt = queueItem.resumeAt; + if (eventResumeAtMs === expectedResumeAt.getTime()) { + return null; + } + + return ( + `Corrupted event log: wait_completed event for ${correlationId} has ` + + `resumeAt "${eventResumeAtDate.toISOString()}", but the current wait ` + + `consumer expects "${expectedResumeAt.toISOString()}"` + ); +} + export function createSleep(ctx: WorkflowOrchestratorContext) { return async function sleepImpl( param: StringValue | Date | number @@ -59,30 +128,17 @@ export function createSleep(ctx: WorkflowOrchestratorContext) { // Check for wait_completed event if (event.eventType === 'wait_completed') { - const eventResumeAt = event.eventData?.resumeAt; - if (eventResumeAt !== undefined) { - const queueItem = ctx.invocationsQueue.get(correlationId); - const expectedResumeAt = - queueItem && queueItem.type === 'wait' - ? queueItem.resumeAt - : resumeAt; - const eventResumeAtDate = new Date(eventResumeAt); - const eventResumeAtMs = eventResumeAtDate.getTime(); - const expectedResumeAtMs = expectedResumeAt.getTime(); - const eventResumeAtForMessage = Number.isFinite(eventResumeAtMs) - ? eventResumeAtDate.toISOString() - : String(eventResumeAt); - - if (eventResumeAtMs !== expectedResumeAtMs) { - ctx.promiseQueue = ctx.promiseQueue.then(() => { - ctx.onWorkflowError( - new CorruptedEventLogError( - `Corrupted event log: wait_completed event for ${correlationId} has resumeAt "${eventResumeAtForMessage}", but the current wait consumer expects "${expectedResumeAt.toISOString()}"` - ) - ); - }); - return EventConsumerResult.Finished; - } + const queueItem = ctx.invocationsQueue.get(correlationId); + const mismatch = detectResumeAtMismatch( + correlationId, + event, + queueItem + ); + if (mismatch) { + ctx.promiseQueue = ctx.promiseQueue.then(() => { + ctx.onWorkflowError(new CorruptedEventLogError(mismatch)); + }); + return EventConsumerResult.Finished; } // Remove this wait from the invocations queue (O(1) delete using Map) diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index 5fc8c35a76..7308fd8b04 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -55,7 +55,7 @@ export async function getNextBuilderDeferred() { const { BaseBuilder: BaseBuilderClass, STEP_QUEUE_TRIGGER, - WORKFLOW_QUEUE_TRIGGER, + getWorkflowQueueTrigger, applySwcTransform, detectWorkflowPatterns, getImportPath, @@ -1117,7 +1117,7 @@ export async function getNextBuilderDeferred() { }, workflows: { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }; diff --git a/packages/next/src/builder-eager.ts b/packages/next/src/builder-eager.ts index 384dd19af8..9784720fe6 100644 --- a/packages/next/src/builder-eager.ts +++ b/packages/next/src/builder-eager.ts @@ -17,7 +17,7 @@ export async function getNextBuilderEager() { const { BaseBuilder: BaseBuilderClass, STEP_QUEUE_TRIGGER, - WORKFLOW_QUEUE_TRIGGER, + getWorkflowQueueTrigger, // biome-ignore lint/security/noGlobalEval: Need to use eval here to avoid TypeScript from transpiling the import statement into `require()` } = (await eval( 'import("@workflow/builders")' @@ -435,7 +435,7 @@ export async function getNextBuilderEager() { }, workflows: { maxDuration: 'max', - experimentalTriggers: [WORKFLOW_QUEUE_TRIGGER], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }; diff --git a/packages/sveltekit/src/index.ts b/packages/sveltekit/src/index.ts index 55ba9b6f70..12eab34b6d 100644 --- a/packages/sveltekit/src/index.ts +++ b/packages/sveltekit/src/index.ts @@ -1,4 +1,5 @@ import path from 'node:path'; +import { getWorkflowQueueTrigger } from '@workflow/builders'; import fs from 'fs-extra'; import { SvelteKitBuilder } from './builder.js'; @@ -21,15 +22,7 @@ process.on('beforeExit', () => { file: '.vercel/output/functions/.well-known/workflow/v1/flow.func/.vc-config.json', config: { maxDuration: 'max', - experimentalTriggers: [ - { - type: 'queue/v2beta', - topic: '__wkf_workflow_*', - consumer: 'default', - retryAfterSeconds: 5, - initialDelaySeconds: 0, - }, - ], + experimentalTriggers: [getWorkflowQueueTrigger()], }, }, { diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b8b2490af8..e78c3dc5c9 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -327,6 +327,110 @@ describe('createQueue', () => { }); }); + describe('strict concurrency (ENFORCE_STRICT_CONCURRENCY)', () => { + let originalDeploymentId: string | undefined; + let originalStrict: string | undefined; + + beforeEach(() => { + originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + originalStrict = process.env.ENFORCE_STRICT_CONCURRENCY; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + mockSend.mockResolvedValue({ messageId: 'msg-123' }); + }); + + afterEach(() => { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + if (originalStrict !== undefined) { + process.env.ENFORCE_STRICT_CONCURRENCY = originalStrict; + } else { + delete process.env.ENFORCE_STRICT_CONCURRENCY; + } + }); + + it('appends runId to the physical flow topic while keeping the logical queueName', async () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' }); + + // send(physicalTopic, wrapper, options) + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc'); + // The logical queue name is preserved so the handler + re-enqueue path + // resolves the same per-run physical topic on the next invocation. + expect(mockSend.mock.calls[0][1].queueName).toBe('__wkf_workflow_test'); + }); + + it('re-enqueues delayed flow messages to the same per-run physical topic', async () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 300, + })); + + await capturedHandler!( + { + payload: { runId: 'wrun_abc' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { messageId: 'msg-123', deliveryCount: 1, createdAt: new Date() } + ); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test_wrun_abc'); + }); + + // TEMP(ci-default-on): skipped while strict concurrency is forced on for CI. + // REVERT BEFORE MERGE (drop the TEMP commit to restore). + it.skip('does not rewrite the topic when the flag is unset', async () => { + delete process.env.ENFORCE_STRICT_CONCURRENCY; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_test', { runId: 'wrun_abc' }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_test'); + }); + + it('does not rewrite step topics even when the flag is set', async () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_step_myStep', { + workflowName: 'test-workflow', + workflowRunId: 'wrun_abc', + workflowStartedAt: Date.now(), + stepId: 'step_xyz', + }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_step_myStep'); + }); + + it('does not rewrite health check topics (no runId) even when the flag is set', async () => { + process.env.ENFORCE_STRICT_CONCURRENCY = '1'; + + const queue = createQueue(); + await queue.queue('__wkf_workflow_health_check', { + __healthCheck: true as const, + correlationId: 'corr_123', + }); + + expect(mockSend.mock.calls[0][0]).toBe('__wkf_workflow_health_check'); + }); + }); + describe('createQueueHandler()', () => { const setupHandler = ({ timeoutSeconds }: { timeoutSeconds: number }) => { let capturedHandler: ( diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 8fdab46579..a003a3e957 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -175,6 +175,33 @@ function getHeadersFromPayload( return Object.keys(headers).length > 0 ? headers : undefined; } +/** + * Resolves the physical VQS topic for a message. + * + * Normally this is just the logical queue name. When `ENFORCE_STRICT_CONCURRENCY` + * is enabled, flow (workflow) messages get a per-run topic by appending the + * run id. VQS scopes `maxConcurrency` per concrete topic, so a per-run topic + * combined with `maxConcurrency: 1` on the flow trigger enforces at most one + * orchestrator invocation per run at a time. Step topics and health checks + * (which carry no `runId`) keep their shared topic. + */ +function getPhysicalQueueName( + queueName: ValidQueueName, + payload: QueuePayload +): string { + // TEMP(ci-default-on): ignore ENFORCE_STRICT_CONCURRENCY so CI exercises the + // per-run topic across all e2e jobs. REVERT BEFORE MERGE — restore the + // `process.env.ENFORCE_STRICT_CONCURRENCY === '1' &&` condition below. + if ( + queueName.startsWith('__wkf_workflow_') && + 'runId' in payload && + typeof payload.runId === 'string' + ) { + return `${queueName}_${payload.runId}`; + } + return queueName; +} + type QueueFunction = ( queueName: ValidQueueName, payload: QueuePayload, @@ -236,11 +263,16 @@ export function createQueue(config?: APIConfig): Queue { // preserving Uint8Array values (workflow input in specVersion >= 2). const wrapper = { payload, + // Keep the logical queue name so the handler and re-enqueue path + // resolve the same per-run physical topic on the next invocation. queueName, // Store deploymentId in the message so it can be preserved when re-enqueueing deploymentId: opts?.deploymentId, }; - const sanitizedQueueName = queueName.replace(/[^A-Za-z0-9-_]/g, '-'); + const sanitizedQueueName = getPhysicalQueueName(queueName, payload).replace( + /[^A-Za-z0-9-_]/g, + '-' + ); try { const { messageId } = await client.send(sanitizedQueueName, wrapper, { idempotencyKey: opts?.idempotencyKey,