Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hook-sleep-replay-ordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/core": patch
---

Fix `CorruptedEventLogError` on replay when a workflow races a hook read against a `sleep()` (e.g. `Promise.race([hook, sleep])`). Branch-deciding deliveries (buffered hook payloads and wait completions) are now handed to the workflow in strict event-log order — anchored on event position rather than on microtask-resolution timing — so the committed branch wins the race deterministically, independent of decryption/hydration time or `Promise.race` argument order.
78 changes: 78 additions & 0 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
onWorkflowError: vi.fn(),
promiseQueue: Promise.resolve(),
pendingDeliveries: 0,
pendingDeliveryBarriers: new Map(),
};
}

Expand Down Expand Up @@ -682,4 +683,81 @@ describe('async deserialization ordering', () => {
// Step A must resolve before step B (event log order)
expect(resolveOrder).toEqual(['A:value_A', 'B:value_B']);
});

// Regression for the buffered-hook delivery rework: a buffered payload's
// hydration outcome is captured and only turned into a resolved/rejected
// promise when a consumer claims it. It must never reject a promise that no
// consumer has attached a handler to (which would surface as an unhandled
// rejection and crash the process).
it('should not emit an unhandled rejection before a buffered hook payload is claimed', async () => {
const unhandledRejections: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason);
};
process.on('unhandledRejection', onUnhandledRejection);

try {
const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: {
payload: new Uint8Array([101, 110, 99, 114]), // "encr" without a key
},
createdAt: new Date(),
},
]);

const createHook = createCreateHook(ctx);
const hook = createHook();

await new Promise((resolve) => setTimeout(resolve, 0));
expect(unhandledRejections).toEqual([]);
await expect(hook).rejects.toThrow(
'Encrypted data encountered but no encryption key'
);
} finally {
process.off('unhandledRejection', onUnhandledRejection);
}
});

// Regression for the delivery-barrier registry: when a buffered hook is
// never claimed because another branch wins, its barrier must still be
// retired (at idle) so `pendingDeliveryBarriers` does not retain one entry
// per abandoned payload.
it('should discard an unclaimed hook delivery barrier after a later wait proceeds', async () => {
const payload = await dehydrateStepReturnValue(
'unused',
'wrun_test',
undefined
);
const resumeAt = new Date('2024-01-01T00:00:05.000Z');
const ctx = setupWorkflowContext([
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: 'hook_01K11TFZ62YS0YYFDQ3E8B9YCV',
eventData: { payload },
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'wait_completed',
correlationId: 'wait_01K11TFZ62YS0YYFDQ3E8B9YCW',
eventData: { resumeAt },
createdAt: new Date(),
},
]);

const createHook = createCreateHook(ctx);
const sleep = createSleep(ctx);
createHook();
await sleep(resumeAt);

expect(ctx.pendingDeliveryBarriers?.size).toBe(0);
});
});
22 changes: 20 additions & 2 deletions packages/core/src/hook-sleep-interaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
promiseQueueHolder.current = value;
},
pendingDeliveries: 0,
pendingDeliveryBarriers: new Map(),
};
ctxRef.current = ctx;
return ctx;
Expand Down Expand Up @@ -314,7 +315,7 @@ function defineTests(mode: 'sync' | 'async') {
expect(result).toEqual(['first', 'second']);
});

it.fails('should let a queued hook payload win when a reused wait completes after the step that installs the race', async () => {
it('should let a queued hook payload win when a reused wait completes after the step that installs the race', async () => {
await setupHydrateMock();
const ops: Promise<any>[] = [];
const [payload, setupResult] = await Promise.all([
Expand Down Expand Up @@ -566,7 +567,7 @@ function defineTests(mode: 'sync' | 'async') {
);
}

it.fails('should let a queued hook payload win without mapping the race promises', async () => {
it('should let a queued hook payload win without mapping the race promises', async () => {
await expectRawRaceToChooseQueuedHook(false);
});

Expand Down Expand Up @@ -793,6 +794,23 @@ function defineTests(mode: 'sync' | 'async') {
);
});

// KNOWN-INVALID (kept as `it.fails`): this scenario is not reproducible by
// the workflow under test and asserts an impossible replay outcome.
//
// In loop 1, `pendingSleep` is the REUSED sleep, whose `wait_completed`
// (evnt_8) is consumed BEFORE loop 1 begins — so `pendingSleep` is an
// already-resolved promise by the time loop 1 runs. Loop 1's
// `iterator.next()` then claims the only remaining buffered payload
// (`hook_received` evnt_9), which also resolves. With BOTH inputs resolved,
// `Promise.race([pendingRead, pendingSleep])` resolves to the FIRST array
// element by JS semantics — `pendingRead` (the hook). The runtime cannot
// change that tie-break; it lives in the workflow author's race-argument
// order. So a real producer run of this workflow would take the HOOK
// branch in loop 1 (recording `drainStep`), never the sleep branch this
// hand-built log encodes (`progressStep` at evnt_14). The fix for the real
// hook-vs-sleep ordering bug is verified by the sibling tests; this case
// should be re-worked (e.g. so the sleep is not pre-resolved at the race)
// or removed. See PR discussion.
it.fails('should preserve the early waiter with a reused sleep when wait completion wins', async () => {
const { ctx, error } = await replayEarlyWaiterAcrossDrain({
winner: 'sleep',
Expand Down
132 changes: 132 additions & 0 deletions packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Utils used by the bundler when transforming code
*/

import { withResolvers } from '@workflow/utils';
import type { CryptoKey } from './encryption.js';
import type { EventsConsumer } from './events-consumer.js';
import type { QueueItem } from './global.js';
Expand Down Expand Up @@ -154,6 +155,137 @@ export interface WorkflowOrchestratorContext {
* to reach 0 before firing, to avoid preempting data delivery.
*/
pendingDeliveries: number;
/**
* Ordered registry of in-flight "branch-deciding" deliveries — the
* resolutions a workflow typically `Promise.race`s on: buffered hook
* payloads (`hook_received`) and wait completions (`wait_completed`).
* Keyed by the delivery's position (index) in the consumed event log.
*
* The problem: a buffered hook payload is observed via the async hook
* iterator (`yield await this`), costing extra microtask hops, while a
* `wait_completed` resolves with fewer hops — and a reused sleep can
* resolve in an entirely earlier loop iteration. Either way, the
* resolution that the committed event log ordered first can lose a
* `Promise.race` to a faster- or already-resolved competitor, diverging
* from the log and surfacing as `CorruptedEventLogError`.
*
* The fix is a strict, deterministic delivery order anchored on
* event-log position: a delivery does not resolve to the workflow until
* every earlier-in-log delivery of the OPPOSITE kind has been delivered.
* (Opposite kind only: sequential same-kind hook payloads must not block
* one another, and a wait need not wait behind a later wait.) Because the
* gate is "the earlier delivery resolved", not "won a timing race", the
* outcome is independent of microtask hops, hydration/decryption time,
* and `Promise.race` argument order.
*
* Index is used rather than the `eventId` string because `eventId` is an
* opaque, world-assigned value not guaranteed to sort in creation order
* (only the bundled ULID worlds happen to).
*
* Optional so older/out-of-tree contexts (and lightweight test harnesses)
* that do not initialize it degrade gracefully to the previous behavior.
*/
pendingDeliveryBarriers?: Map<number, DeliveryBarrierEntry>;
}

/** The kind of branch-deciding delivery a barrier represents. */
export type DeliveryKind = 'hook' | 'wait';

interface DeliveryBarrierEntry {
kind: DeliveryKind;
/** Resolves once this delivery has resolved to the workflow. */
delivered: Promise<void>;
}

/**
* Awaits, in strict event-log order, every still-registered delivery whose
* index is earlier than `eventIndex` AND whose kind is in `deferBehindKinds`,
* so that this resolution is handed to the workflow only after all relevant
* earlier-in-log deliveries have been. This is what keeps a `Promise.race`
* deterministic and aligned with the committed event log, independent of
* microtask-hop counts, hydration time, or race-argument order.
*
* `deferBehindKinds` is the opposite kind(s): a hook defers behind earlier
* WAITS (not earlier hooks — those are sequential same-entity payloads), a
* wait defers behind earlier HOOKS.
*/
export async function awaitEarlierDeliveries(
ctx: WorkflowOrchestratorContext,
eventIndex: number | undefined,
deferBehindKinds: readonly DeliveryKind[]
): Promise<void> {
// Defensive: tolerate contexts that predate this field (test harnesses).
if (
eventIndex === undefined ||
!ctx.pendingDeliveryBarriers ||
ctx.pendingDeliveryBarriers.size === 0
) {
return;
}
const earlier: Promise<void>[] = [];
for (const [index, entry] of ctx.pendingDeliveryBarriers) {
if (index < eventIndex && deferBehindKinds.includes(entry.kind)) {
earlier.push(entry.delivered);
}
}
if (earlier.length > 0) {
await Promise.all(earlier);
}
}

/** Handle for a registered branch-deciding delivery barrier. */
export interface DeliveryBarrier {
/**
* Mark this delivery as delivered to the workflow. Resolves its
* `delivered` promise so any later-in-log opposite-kind delivery gated on
* it (via {@link awaitEarlierDeliveries}) may proceed, and removes it from
* the registry. Idempotent.
*/
markDelivered: () => void;
}

/**
* Register a branch-deciding delivery at its event-log index so that later
* opposite-kind deliveries can be ordered strictly after it. Returns an inert
* handle when `pendingDeliveryBarriers` is not initialized.
*
* To guarantee a later delivery gated on this one can never hang when this
* delivery is abandoned (the workflow took a different branch or is
* suspending and never observes it), the barrier auto-resolves at idle.
*/
export function registerDeliveryBarrier(
ctx: WorkflowOrchestratorContext,
eventIndex: number | undefined,
kind: DeliveryKind
): DeliveryBarrier {
const barriers = ctx.pendingDeliveryBarriers;
if (!barriers || eventIndex === undefined) {
return { markDelivered: () => {} };
}

let done = false;
const { promise, resolve } = withResolvers<void>();
const entry: DeliveryBarrierEntry = { kind, delivered: promise };
barriers.set(eventIndex, entry);

const finish = () => {
if (done) {
return;
}
done = true;
if (barriers.get(eventIndex) === entry) {
barriers.delete(eventIndex);
}
resolve();
};

// Safety net: if this delivery is never delivered to the workflow (its
// branch was not taken / the run is suspending), resolve at idle so a
// later opposite-kind delivery gated on it cannot deadlock and the
// registry cannot leak an entry per abandoned delivery.
scheduleWhenIdle(ctx, finish);

return { markDelivered: finish };
}

/**
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
onWorkflowError: vi.fn(),
promiseQueue: Promise.resolve(),
pendingDeliveries: 0,
pendingDeliveryBarriers: new Map(),
};
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export async function runWorkflow(
promiseQueueHolder.current = value;
},
pendingDeliveries: 0,
pendingDeliveryBarriers: new Map(),
};

// Subscribe to the events log to update the timestamp in the vm context
Expand Down
Loading
Loading