Skip to content

.cursor() async iterator swallows error when promise rejects between resolution of batch n and requesting batch n+1 #1166

@psteinroe

Description

@psteinroe

hey,

thanks for this awesome library!

we encountered a bug with the cursor() method when running against a replica.

Reproduction

run the following against a replica under load and make sure the queried data gets updated regularly. it also helps if the query takes a while to complete.

we added debug logs to postgres.js locally to see what happens. here are the last three logs:

Batch 9

{"type":"batch","label":"replica","iteration":34,"batchCount":9,"batchSize":100,"totalRows":900,"firstRowNumber":"800","lastRowNumber":"899","firstId":"8b2cbb81-420c-4f1c-83a0-a70d1ab9d385","lastId":"9d405b2d-ac5a-4838-8090-11cc71383e7c"}

Error Log In-Between

[postgres-cursor-debug] {"event":"error-response","connectionId":136,"backendPid":2993477, "code":"40001","message":"terminating connection due to conflict with recovery","severity":"FATAL","routine":"ProcessInterrupts"}

Next Iteration: cursor completed with a count mismatch.

{"type":"finish","label":"replica","iteration":34,"outcome":"mismatch","expectedCount":1495,"totalRows":900,"batchCount":9,"lastRowNumber":"899"}

Script to Replicate

The Script
#!/usr/bin/env node

import postgres from 'postgres';

const DATABASE_URL = process.env.DATABASE_URL ?? '<DATABASE_URL>';
const QUERY_PARAM = process.env.QUERY_PARAM ?? '<QUERY_PARAM>';
const BATCH_SIZE = 100;

const META_QUERY = `
  select
    pg_backend_pid() as backend_pid,
    pg_is_in_recovery() as is_in_recovery,
    current_setting('statement_timeout') as statement_timeout
`;

const BASE_QUERY = `
  select
    id,
    row_number() over (order by id) - 1 as row_number
  from your_table_or_function_here
  where your_filter_column = $1
`;

const sql = postgres(DATABASE_URL, {
  max: 1,
  prepare: true,
  connection: {
    application_name: 'cursor-repro',
  },
});

try {
  const [meta] = await sql.unsafe(META_QUERY);
  console.log(JSON.stringify({ type: 'meta', ...meta }));

  const [countRow] = await sql.unsafe(
    `select count(*)::bigint as count from (${BASE_QUERY}) q`,
    [QUERY_PARAM]
  );

  let cursorCount = 0;

  for await (const rows of sql.unsafe(BASE_QUERY, [QUERY_PARAM]).cursor(BATCH_SIZE)) {
    cursorCount += rows.length;

    console.log(
      JSON.stringify({
        type: 'batch',
        batchSize: rows.length,
        cursorCount,
        firstId: rows[0]?.id ?? null,
        lastId: rows.at(-1)?.id ?? null,
        firstRowNumber: rows[0]?.row_number ?? null,
        lastRowNumber: rows.at(-1)?.row_number ?? null,
      })
    );
  }

  const expectedCount = Number(countRow.count);

  console.log(
    JSON.stringify({
      type: 'result',
      expectedCount,
      cursorCount,
      outcome: cursorCount === expectedCount ? 'match' : 'mismatch',
    })
  );
} catch (error) {
  console.error(
    JSON.stringify({
      type: 'error',
      message: error instanceof Error ? error.message : String(error),
      code: error?.code ?? null,
      severity: error?.severity ?? null,
      routine: error?.routine ?? null,
    })
  );

  process.exitCode = 1;
} finally {
  await sql.end({ timeout: 5 });
}

What (i think) happens

next() starts with

postgres/src/query.js

Lines 90 to 91 in e7dfa14

if (this.executed && !this.active)
return { done: true }

which stops the iterator (without throwing) when !this.active.

for each pending next() call, we set reject on the Query

this.reject = x => (this.active = false, reject(x))

now, the promise of the current batch will be settled in

postgres/src/connection.js

Lines 839 to 850 in e7dfa14

async function PortalSuspended() {
try {
const x = await Promise.resolve(query.cursorFn(result))
rows = 0
x === CLOSE
? write(Close(query.portal))
: (result = new Result(), write(Execute('', query.cursorRows)))
} catch (err) {
write(Sync)
query.reject(err)
}
}

with resolve({ value, done: false }) of cursorFn

postgres/src/query.js

Lines 94 to 101 in e7dfa14

const promise = new Promise((resolve, reject) => {
this.cursorFn = value => {
resolve({ value, done: false })
return new Promise(r => prev = r)
}
this.resolve = () => (this.active = false, resolve({ done: true }))
this.reject = x => (this.active = false, reject(x))
})

now an error arrives before the consumer asks for the next batch:

  • query.reject(error) runs
  • active becomes false
  • but that reject is hitting a promise that was already settled for the previous batch

the state of the promise and active are now out of sync and the error is swallowed. on the next iteration, this.active is false and the cursor stops. no error is thrown, and it looks like the cursor completed successfully even if rows were skipped.

Potential Fix

on reject, store the error next to active: false and reject in the next iteration if it is set.

Workaround

use the callback overload of cursor

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions