Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/powersync-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"@standard-schema/spec": "^1.1.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.8.0",
"async-mutex": "^0.5.0",
"debug": "^4.4.3",
"p-defer": "^4.0.1"
},
Expand Down
1 change: 1 addition & 0 deletions packages/powersync-db-collection/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './definitions'
export * from './powersync'
export * from './PowerSyncTransactor'
export * from './sqlite-compiler'
300 changes: 248 additions & 52 deletions packages/powersync-db-collection/src/powersync.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { DiffTriggerOperation, sanitizeSQL } from '@powersync/common'
import { Mutex } from 'async-mutex'
import { or } from '@tanstack/db'
import { compileSQLite } from './sqlite-compiler'
import { PendingOperationStore } from './PendingOperationStore'
import { PowerSyncTransactor } from './PowerSyncTransactor'
import { DEFAULT_BATCH_SIZE } from './definitions'
import { asPowerSyncRecord, mapOperation } from './helpers'
import { convertTableToSchema } from './schema'
import { serializeForSQLite } from './serialization'
import type { LoadSubsetOptions, OperationType, SyncConfig } from '@tanstack/db'
import type {
AnyTableColumnType,
ExtractedTable,
Expand All @@ -24,9 +28,8 @@ import type {
PowerSyncCollectionUtils,
} from './definitions'
import type { PendingOperation } from './PendingOperationStore'
import type { SyncConfig } from '@tanstack/db'
import type { StandardSchemaV1 } from '@standard-schema/spec'
import type { Table, TriggerDiffRecord } from '@powersync/common'
import type { LockContext, Table, TriggerDiffRecord } from '@powersync/common'

/**
* Creates PowerSync collection options for use with a standard Collection.
Expand Down Expand Up @@ -225,6 +228,7 @@ export function powerSyncCollectionOptions<
table,
schema: inputSchema,
syncBatchSize = DEFAULT_BATCH_SIZE,
syncMode = 'eager',
...restConfig
} = config

Expand Down Expand Up @@ -296,11 +300,66 @@ export function powerSyncCollectionOptions<
*/
const sync: SyncConfig<OutputType, string> = {
sync: (params) => {
const { begin, write, commit, markReady } = params
const { begin, write, collection, commit, markReady } = params
const abortController = new AbortController()

// The sync function needs to be synchronous
async function start() {
let disposeTracking: (() => Promise<void>) | null = null

if (syncMode === `eager`) {
return runEagerSync()
} else {
return runOnDemandSync()
}

async function createDiffTrigger(options: {
when: Record<DiffTriggerOperation, string>
writeType: (rowId: string) => OperationType
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) => Promise<Array<TableType>>
onReady: () => void
}) {
const { when, writeType, batchQuery, onReady } = options

return await database.triggers.createDiffTrigger({
source: viewName,
destination: trackedTableName,
when,
hooks: {
beforeCreate: async (context) => {
let currentBatchCount = syncBatchSize
let cursor = 0
while (currentBatchCount == syncBatchSize) {
begin()

const batchItems = await batchQuery(
context,
syncBatchSize,
cursor,
)
currentBatchCount = batchItems.length
cursor += currentBatchCount
for (const row of batchItems) {
write({
type: writeType(row.id),
value: deserializeSyncRow(row),
})
}
commit()
}
onReady()
database.logger.info(
`Sync is ready for ${viewName} into ${trackedTableName}`,
)
},
},
})
}

// The sync function needs to be synchronous.
async function start(afterOnChangeRegistered?: () => Promise<void>) {
database.logger.info(
`Sync is starting for ${viewName} into ${trackedTableName}`,
)
Expand Down Expand Up @@ -362,68 +421,204 @@ export function powerSyncCollectionOptions<
},
)

const disposeTracking = await database.triggers.createDiffTrigger({
source: viewName,
destination: trackedTableName,
when: {
[DiffTriggerOperation.INSERT]: `TRUE`,
[DiffTriggerOperation.UPDATE]: `TRUE`,
[DiffTriggerOperation.DELETE]: `TRUE`,
},
hooks: {
beforeCreate: async (context) => {
let currentBatchCount = syncBatchSize
let cursor = 0
while (currentBatchCount == syncBatchSize) {
begin()
const batchItems = await context.getAll<TableType>(
sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`,
[syncBatchSize, cursor],
)
currentBatchCount = batchItems.length
cursor += currentBatchCount
for (const row of batchItems) {
write({
type: `insert`,
value: deserializeSyncRow(row),
})
}
commit()
}
markReady()
database.logger.info(
`Sync is ready for ${viewName} into ${trackedTableName}`,
)
},
},
})
await afterOnChangeRegistered?.()

// If the abort controller was aborted while processing the request above
if (abortController.signal.aborted) {
await disposeTracking()
await disposeTracking?.()
} else {
abortController.signal.addEventListener(
`abort`,
() => {
disposeTracking()
disposeTracking?.()
},
{ once: true },
)
}
}

start().catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)
// Eager mode.
// Registers a diff trigger for the entire table.
function runEagerSync() {
start(async () => {
disposeTracking = await createDiffTrigger({
when: {
[DiffTriggerOperation.INSERT]: `TRUE`,
[DiffTriggerOperation.UPDATE]: `TRUE`,
[DiffTriggerOperation.DELETE]: `TRUE`,
},
writeType: (_rowId: string) => `insert`,
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) =>
lockContext.getAll<TableType>(
sanitizeSQL`SELECT * FROM ${viewName} LIMIT ? OFFSET ?`,
[batchSize, cursor],
),
onReady: () => markReady(),
})
}).catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)

return () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
)
abortController.abort()
}
}

return () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
// On-demand mode.
// Registers a diff trigger for the active WHERE expressions.
function runOnDemandSync() {
start().catch((error) =>
database.logger.error(
`Could not start syncing process for ${viewName} into ${trackedTableName}`,
error,
),
)
abortController.abort()

// Tracks all active WHERE expressions for on-demand sync filtering.
// Each loadSubset call pushes its predicate; unloadSubset removes it.
const activeWhereExpressions: Array<LoadSubsetOptions['where']> = []
const mutex = new Mutex()

const loadSubset = async (
options?: LoadSubsetOptions,
): Promise<void> => {
if (options) {
activeWhereExpressions.push(options.where)
}

if (activeWhereExpressions.length === 0) {
await disposeTracking?.()
return
}

const combinedWhere =
activeWhereExpressions.length === 1
? activeWhereExpressions[0]
: or(
activeWhereExpressions[0],
activeWhereExpressions[1],
...activeWhereExpressions.slice(2),
)

const compiledNewData = compileSQLite(
{ where: combinedWhere },
{ jsonColumn: 'NEW.data' },
)

const compiledOldData = compileSQLite(
{ where: combinedWhere },
{ jsonColumn: 'OLD.data' },
)

const compiledView = compileSQLite({ where: combinedWhere })

const newDataWhenClause = toInlinedWhereClause(compiledNewData)
const oldDataWhenClause = toInlinedWhereClause(compiledOldData)
const viewWhereClause = toInlinedWhereClause(compiledView)

await disposeTracking?.()
Comment thread
Chriztiaan marked this conversation as resolved.
Outdated

disposeTracking = await createDiffTrigger({
when: {
[DiffTriggerOperation.INSERT]: newDataWhenClause,
[DiffTriggerOperation.UPDATE]: `(${newDataWhenClause}) OR (${oldDataWhenClause})`,
[DiffTriggerOperation.DELETE]: oldDataWhenClause,
},
writeType: (rowId: string) =>
collection.has(rowId) ? `update` : `insert`,
batchQuery: (
lockContext: LockContext,
batchSize: number,
cursor: number,
) =>
lockContext.getAll<TableType>(
`SELECT * FROM ${viewName} WHERE ${viewWhereClause} LIMIT ? OFFSET ?`,
[batchSize, cursor],
),
onReady: () => {},
})
}

const toInlinedWhereClause = (compiled: {
where?: string
params: Array<unknown>
}): string => {
if (!compiled.where) return 'TRUE'
const sqlParts = compiled.where.split('?')
return sanitizeSQL(
sqlParts as unknown as TemplateStringsArray,
...compiled.params,
)
}

const unloadSubset = async (options: LoadSubsetOptions) => {
const idx = activeWhereExpressions.indexOf(options.where)
if (idx !== -1) {
activeWhereExpressions.splice(idx, 1)
}

// Evict rows that were exclusively loaded by the departing predicate.
// These are rows matching the departing WHERE that are no longer covered
// by any remaining active predicate.
const compiledDeparting = compileSQLite({ where: options.where })
const departingWhereSQL = toInlinedWhereClause(compiledDeparting)

let evictionSQL: string
if (activeWhereExpressions.length === 0) {
evictionSQL = `SELECT id FROM ${viewName} WHERE ${departingWhereSQL}`
} else {
const combinedRemaining =
activeWhereExpressions.length === 1
? activeWhereExpressions[0]!
: or(
activeWhereExpressions[0],
activeWhereExpressions[1],
...activeWhereExpressions.slice(2),
)
const compiledRemaining = compileSQLite({
where: combinedRemaining,
})
const remainingWhereSQL = toInlinedWhereClause(compiledRemaining)
evictionSQL = `SELECT id FROM ${viewName} WHERE (${departingWhereSQL}) AND NOT (${remainingWhereSQL})`
}

const rowsToEvict = await database.getAll<{ id: string }>(evictionSQL)
if (rowsToEvict.length > 0) {
begin()
for (const { id } of rowsToEvict) {
write({ type: `delete`, key: id })
}
commit()
}

// Recreate the diff trigger for the remaining active WHERE expressions.
await loadSubset()
}

markReady()

return {
cleanup: () => {
database.logger.info(
`Sync has been stopped for ${viewName} into ${trackedTableName}`,
)
abortController.abort()
},
loadSubset: (options: LoadSubsetOptions) =>
mutex.runExclusive(() => loadSubset(options)),
unloadSubset: (options: LoadSubsetOptions) =>
mutex.runExclusive(() => unloadSubset(options)),
}
}
},
// Expose the getSyncMetadata function
Expand All @@ -442,6 +637,7 @@ export function powerSyncCollectionOptions<
getKey,
// Syncing should start immediately since we need to monitor the changes for mutations
startSync: true,
syncMode,
sync,
onInsert: async (params) => {
// The transaction here should only ever contain a single insert mutation
Expand Down
Loading
Loading