Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/fix-broadcast-client-unhandled-rejection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/query-broadcast-client-experimental": patch
---

Fix unhandled `DataCloneError` rejections in `broadcastQueryClient` when `postMessage` fails due to non-cloneable query data (e.g. `ReadableStream`, `Response`, Vue reactive proxies). Adds an optional `onBroadcastError` callback to handle errors explicitly; falls back to `console.warn` in development when not provided.
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
import { QueryClient } from '@tanstack/query-core'
import { beforeEach, describe, expect, it } from 'vitest'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { broadcastQueryClient } from '..'
import type { QueryCache } from '@tanstack/query-core'

const mockPostMessage = vi.fn().mockResolvedValue(undefined)
const mockClose = vi.fn()

vi.mock('broadcast-channel', async (importOriginal) => {
const actual = await importOriginal()
return {
...(actual as object),
BroadcastChannel: class MockBroadcastChannel {
onmessage = null
postMessage = mockPostMessage
close = mockClose
},
}
})

describe('broadcastQueryClient', () => {
let queryClient: QueryClient
let queryCache: QueryCache

beforeEach(() => {
queryClient = new QueryClient()
queryCache = queryClient.getQueryCache()
mockPostMessage.mockResolvedValue(undefined)
mockClose.mockReset()
})

it('should subscribe to the query cache', () => {
Expand All @@ -28,4 +45,112 @@ describe('broadcastQueryClient', () => {
unsubscribe()
expect(queryCache.hasListeners()).toBe(false)
})

describe('postMessage error handling', () => {
let originalEnv: string | undefined

beforeEach(() => {
originalEnv = process.env['NODE_ENV']
})

afterEach(() => {
process.env['NODE_ENV'] = originalEnv
})

it('should not cause an unhandled rejection when onBroadcastError itself throws', async () => {
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const unhandledRejections: Array<unknown> = []
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason)
}
process.on('unhandledRejection', onUnhandledRejection)

const onBroadcastError = vi.fn().mockImplementation(() => {
throw new Error('boom')
})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
onBroadcastError,
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))

process.off('unhandledRejection', onUnhandledRejection)

expect(onBroadcastError).toHaveBeenCalledWith(
cloneError,
expect.objectContaining({ type: 'added' }),
)
expect(unhandledRejections).toHaveLength(0)
})

it('should call onBroadcastError when postMessage fails', async () => {
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const onBroadcastError = vi.fn()
broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
onBroadcastError,
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(onBroadcastError).toHaveBeenCalledWith(
cloneError,
expect.objectContaining({ type: 'added' }),
)
})

it('should warn in dev when postMessage fails and onBroadcastError is not provided', async () => {
process.env['NODE_ENV'] = 'development'
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('[broadcastQueryClient]'),
cloneError,
)

warnSpy.mockRestore()
})

it('should not warn in production when postMessage fails', async () => {
process.env['NODE_ENV'] = 'production'
const cloneError = new DOMException('DataCloneError', 'DataCloneError')
mockPostMessage.mockRejectedValueOnce(cloneError)

const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {})

broadcastQueryClient({
queryClient,
broadcastChannel: 'test_channel',
})

queryClient.setQueryData(['test'], { value: 1 })

await new Promise((r) => setTimeout(r, 0))
expect(warnSpy).not.toHaveBeenCalled()

warnSpy.mockRestore()
})
})
})
37 changes: 34 additions & 3 deletions packages/query-broadcast-client-experimental/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@ import { BroadcastChannel } from 'broadcast-channel'
import type { BroadcastChannelOptions } from 'broadcast-channel'
import type { QueryClient } from '@tanstack/query-core'

interface BroadcastMessage {
type: 'updated' | 'removed' | 'added'
queryHash: string
queryKey?: unknown
state?: unknown
}

interface BroadcastQueryClientOptions {
queryClient: QueryClient
broadcastChannel?: string
options?: BroadcastChannelOptions
onBroadcastError?: (error: unknown, message: BroadcastMessage) => void
}

export function broadcastQueryClient({
queryClient,
broadcastChannel = 'tanstack-query',
options,
onBroadcastError,
}: BroadcastQueryClientOptions): () => void {
let transaction = false
const tx = (cb: () => void) => {
Expand All @@ -27,6 +36,28 @@ export function broadcastQueryClient({

const queryCache = queryClient.getQueryCache()

const safePost = (message: BroadcastMessage) => {
channel.postMessage(message).catch((error: unknown) => {
if (onBroadcastError) {
try {
onBroadcastError(error, message)
} catch {
if (process.env.NODE_ENV !== 'production') {
console.warn(
`[broadcastQueryClient] failed to broadcast "${message.type}" for queryHash "${message.queryHash}":`,
error,
)
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else if (process.env.NODE_ENV !== 'production') {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
console.warn(
`[broadcastQueryClient] failed to broadcast "${message.type}" for queryHash "${message.queryHash}":`,
error,
)
}
})
}

const unsubscribe = queryClient.getQueryCache().subscribe((queryEvent) => {
if (transaction) {
return
Expand All @@ -37,7 +68,7 @@ export function broadcastQueryClient({
} = queryEvent

if (queryEvent.type === 'updated' && queryEvent.action.type === 'success') {
channel.postMessage({
safePost({
type: 'updated',
queryHash,
queryKey,
Expand All @@ -46,15 +77,15 @@ export function broadcastQueryClient({
}

if (queryEvent.type === 'removed' && observers.length > 0) {
channel.postMessage({
safePost({
type: 'removed',
queryHash,
queryKey,
})
}

if (queryEvent.type === 'added') {
channel.postMessage({
safePost({
type: 'added',
queryHash,
queryKey,
Expand Down