Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/fix-broadcast-client-unhandled-rejection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/query-broadcast-client-experimental": patch
---

Fix unhandled `DataCloneError` rejections in `broadcastQueryClient` when `postMessage` fails due to non-cloneable query data (e.g. `ReadableStream`, `Response`, Vue reactive proxies). Adds an optional `onBroadcastError` callback to handle errors explicitly; falls back to `console.warn` in development when not provided.
39 changes: 39 additions & 0 deletions docs/framework/react/plugins/broadcastQueryClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ interface BroadcastQueryClientOptions {
broadcastChannel?: string
/** Options for the BroadcastChannel API */
options?: BroadcastChannelOptions
/**
* Called when a query event fails to broadcast to other tabs — most
* commonly because the query's data, error, or key contains a value the
* structured-clone algorithm cannot serialize (e.g. `ReadableStream`,
* `File`, functions, Vue `reactive` proxies).
*
* If omitted, a `console.warn` is emitted in development so failures
* are never entirely silent. May return a `Promise`; any rejection is
* caught internally.
*/
onBroadcastError?: (error: unknown, event: BroadcastErrorEvent) => void | Promise<void>
}

interface BroadcastErrorEvent {
type: 'updated' | 'removed' | 'added'
queryHash: string
queryKey: QueryKey
}
```

Expand All @@ -59,3 +76,25 @@ The default options are:
broadcastChannel = 'tanstack-query',
}
```

## Handling broadcast errors

If your cache can hold values that are not structured-cloneable — such as `ReadableStream` (from `Response.body`, streaming APIs, or AI SDKs), `File`, functions, or framework proxies like Vue `reactive` — the underlying `BroadcastChannel.postMessage` call will reject for that query. Cross-tab sync is skipped for that query; the rest of the cache continues to broadcast normally.

By default, a `console.warn` is emitted in development so failures are never silent. Provide `onBroadcastError` to route failures to your own error tracker:

```tsx
import * as Sentry from '@sentry/browser'
import { broadcastQueryClient } from '@tanstack/query-broadcast-client-experimental'

broadcastQueryClient({
queryClient,
broadcastChannel: 'my-app',
onBroadcastError: (error, event) => {
Sentry.captureException(error, {
tags: { broadcastEvent: event.type },
extra: { queryHash: event.queryHash, queryKey: event.queryKey },
})
},
})
```
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
import { QueryClient } from '@tanstack/query-core'
import { beforeEach, describe, expect, it } from 'vitest'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { broadcastQueryClient } from '..'
import type { BroadcastErrorEvent } from '..'
import type { QueryCache } from '@tanstack/query-core'

const mockPostMessage = vi.fn().mockResolvedValue(undefined)
const mockClose = vi.fn()

vi.mock('broadcast-channel', async (importOriginal) => {
const actual = await importOriginal()
return {
...(actual as object),
BroadcastChannel: class MockBroadcastChannel {
onmessage = null
postMessage = mockPostMessage
close = mockClose
},
}
})

describe('broadcastQueryClient', () => {
let queryClient: QueryClient
let queryCache: QueryCache

beforeEach(() => {
queryClient = new QueryClient()
queryCache = queryClient.getQueryCache()
mockPostMessage.mockResolvedValue(undefined)
mockClose.mockReset()
})

it('should subscribe to the query cache', () => {
Expand All @@ -28,4 +46,161 @@ describe('broadcastQueryClient', () => {
unsubscribe()
expect(queryCache.hasListeners()).toBe(false)
})

describe('postMessage error handling', () => {
let originalEnv: string | undefined

beforeEach(() => {
originalEnv = process.env['NODE_ENV']
})

afterEach(() => {
process.env['NODE_ENV'] = originalEnv
})

it('should not cause an unhandled rejection when onBroadcastError itself throws', async () => {
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const unhandledRejections: Array<unknown> = []
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason)
}
process.on('unhandledRejection', onUnhandledRejection)

try {
const onBroadcastError = vi.fn().mockImplementation(() => {
throw new Error('boom')
})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
onBroadcastError,
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))

expect(onBroadcastError).toHaveBeenCalledWith(
cloneError,
expect.objectContaining<BroadcastErrorEvent>({
type: 'added',
queryHash: expect.any(String) as string,
queryKey: ['test'],
}),
)
expect(unhandledRejections).toHaveLength(0)
} finally {
process.off('unhandledRejection', onUnhandledRejection)
}
})

it('should not cause an unhandled rejection when async onBroadcastError rejects', async () => {
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const unhandledRejections: Array<unknown> = []
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason)
}
process.on('unhandledRejection', onUnhandledRejection)

try {
const onBroadcastError = vi
.fn()
.mockRejectedValueOnce(new Error('async boom'))

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
onBroadcastError,
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 10))

expect(onBroadcastError).toHaveBeenCalledWith(
cloneError,
expect.objectContaining<BroadcastErrorEvent>({
type: 'added',
queryHash: expect.any(String) as string,
queryKey: ['test'],
}),
)
expect(unhandledRejections).toHaveLength(0)
} finally {
process.off('unhandledRejection', onUnhandledRejection)
}
})

it('should call onBroadcastError when postMessage fails', async () => {
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const onBroadcastError = vi.fn()
broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
onBroadcastError,
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(onBroadcastError).toHaveBeenCalledWith(
cloneError,
expect.objectContaining<BroadcastErrorEvent>({
type: 'added',
queryHash: expect.any(String) as string,
queryKey: ['test'],
}),
)
})

it('should warn in dev when postMessage fails and onBroadcastError is not provided', async () => {
process.env['NODE_ENV'] = 'development'
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('cross-tab sync for this query was skipped'),
cloneError,
)

warnSpy.mockRestore()
})

it('should not warn in production when postMessage fails', async () => {
process.env['NODE_ENV'] = 'production'
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(warnSpy).not.toHaveBeenCalled()

warnSpy.mockRestore()
})
})
})
84 changes: 79 additions & 5 deletions packages/query-broadcast-client-experimental/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,56 @@
import { BroadcastChannel } from 'broadcast-channel'
import type { BroadcastChannelOptions } from 'broadcast-channel'
import type { QueryClient } from '@tanstack/query-core'
import type { QueryClient, QueryKey } from '@tanstack/query-core'

/**
* Metadata describing a broadcast that failed to be delivered to other tabs.
* Passed to {@link BroadcastQueryClientOptions.onBroadcastError} so callers
* can correlate failures with the originating query.
*/
export interface BroadcastErrorEvent {
type: 'updated' | 'removed' | 'added'
queryHash: string
queryKey: QueryKey
}

type BroadcastMessage =
| { type: 'updated'; queryHash: string; queryKey: QueryKey; state: unknown }
| { type: 'removed'; queryHash: string; queryKey: QueryKey }
| { type: 'added'; queryHash: string; queryKey: QueryKey }
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

interface BroadcastQueryClientOptions {
/** The QueryClient to sync. */
queryClient: QueryClient
/**
* Unique channel name used to communicate between tabs and windows.
* @default 'tanstack-query'
*/
broadcastChannel?: string
/** Options forwarded to the underlying `BroadcastChannel`. */
options?: BroadcastChannelOptions
/**
* Called when a query event fails to broadcast to other tabs — most
* commonly because the query's `state.data`, `state.error`, or `queryKey`
* contains a value the structured-clone algorithm cannot serialize
* (e.g. `ReadableStream`, `File`, functions, Vue `reactive` proxies).
*
* Provide this to route failures to an error tracker. If omitted, a
* `console.warn` is emitted in development so failures are never silent.
*
* May return a `Promise`; any rejection is caught internally so it cannot
* cause a secondary unhandled rejection.
*/
onBroadcastError?: (
error: unknown,
event: BroadcastErrorEvent,
) => void | Promise<void>
}

export function broadcastQueryClient({
queryClient,
broadcastChannel = 'tanstack-query',
options,
onBroadcastError,
}: BroadcastQueryClientOptions): () => void {
let transaction = false
const tx = (cb: () => void) => {
Expand All @@ -27,7 +66,42 @@ export function broadcastQueryClient({

const queryCache = queryClient.getQueryCache()

const unsubscribe = queryClient.getQueryCache().subscribe((queryEvent) => {
const safePost = (message: BroadcastMessage): void => {
channel.postMessage(message).catch((error: unknown) => {
const event: BroadcastErrorEvent = {
type: message.type,
queryHash: message.queryHash,
queryKey: message.queryKey,
}

if (onBroadcastError) {
const warnCallbackError = (callbackError: unknown) => {
if (process.env.NODE_ENV !== 'production') {
console.warn(
`[broadcastQueryClient] onBroadcastError threw while handling "${event.type}" for query ${event.queryHash}.`,
callbackError,
)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let result: void | Promise<void>
try {
result = onBroadcastError(error, event)
} catch (callbackError) {
warnCallbackError(callbackError)
return
}
result?.catch(warnCallbackError)
} else if (process.env.NODE_ENV !== 'production') {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
console.warn(
`[broadcastQueryClient] Failed to broadcast "${event.type}" event for query ${event.queryHash}. ` +
'The query value could not be structured-cloned; cross-tab sync for this query was skipped.',
error,
)
}
})
}

const unsubscribe = queryCache.subscribe((queryEvent) => {
if (transaction) {
return
}
Expand All @@ -37,7 +111,7 @@ export function broadcastQueryClient({
} = queryEvent

if (queryEvent.type === 'updated' && queryEvent.action.type === 'success') {
channel.postMessage({
safePost({
type: 'updated',
queryHash,
queryKey,
Expand All @@ -46,15 +120,15 @@ export function broadcastQueryClient({
}

if (queryEvent.type === 'removed' && observers.length > 0) {
channel.postMessage({
safePost({
type: 'removed',
queryHash,
queryKey,
})
}

if (queryEvent.type === 'added') {
channel.postMessage({
safePost({
type: 'added',
queryHash,
queryKey,
Expand Down