Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
88987e9
refactor(dao): Filter reactions by userId at DB level.
VelikovPetar May 19, 2026
fe4d4e2
refactor(dao): Filter reactions by userId at DB level (pinned messages).
VelikovPetar May 19, 2026
5652130
Merge branch 'refs/heads/master' into feature/FLU-485_optimize_read_m…
VelikovPetar May 19, 2026
466d9d3
refactor(dao): optimize message retrieval with SQL-side pagination fi…
VelikovPetar May 20, 2026
e1ec859
refactor(dao): fix formatting
VelikovPetar May 21, 2026
d38c049
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar May 21, 2026
4689606
refactor(dao): Update CHANGELOG.md
VelikovPetar May 21, 2026
edb86f7
refactor(dao): Apply formatting
VelikovPetar May 21, 2026
4efc8c4
refactor(dao): optimize message and reaction retrieval with grouped q…
VelikovPetar May 22, 2026
9553c2d
refactor(dao): add tests for new methods
VelikovPetar May 22, 2026
b580ccd
refactor(dao): enhance message pagination logic to support inclusive …
VelikovPetar May 25, 2026
52f0c26
refactor(dao): Update docs
VelikovPetar May 25, 2026
d7a18d6
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar May 25, 2026
e465f81
Merge branch 'feature/FLU-485_optimize_read_message_from_db' into fea…
VelikovPetar May 25, 2026
0dfd4fe
refactor(dao): Remove legacy method
VelikovPetar May 25, 2026
80719e6
refactor(dao): Remove legacy method
VelikovPetar May 25, 2026
1da0c45
refactor(dao): Remove legacy method
VelikovPetar May 25, 2026
cc443dc
refactor(dao): Fix warnings
VelikovPetar May 25, 2026
2e719f4
refactor(dao): Add message.id tiebreakier
VelikovPetar May 25, 2026
35569a0
refactor(dao): Add message.id tiebreaker
VelikovPetar May 25, 2026
fa6898e
refactor(dao): Update CHANGELOG.md
VelikovPetar May 25, 2026
c04a7cf
Merge branch 'refs/heads/feature/FLU-485_optimize_read_message_from_d…
VelikovPetar May 26, 2026
3b385f3
perf(llc): Move pinned messages pagination to SQL
VelikovPetar May 26, 2026
bb41f45
Merge remote-tracking branch 'origin/master' into feature/FLU-487_mov…
VelikovPetar Jun 5, 2026
f6b1b11
fix(persistence): Add tests.
VelikovPetar Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method.
- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
- Read only the messages matching the `PaginationParams` from DB when calling `PinnedMessageDao.getMessagesByCid` instead of reading all pinned messages for the channel and applying pagination in memory.
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
- Improve the message read times from DB.
Expand All @@ -13,10 +14,8 @@
- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.


🐛 Fixed

- `PinnedMessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.
- `PinnedMessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail.
- Fixed missing persistence of the `team` field on channel entities.

🔄 Changed
Expand Down
116 changes: 90 additions & 26 deletions packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,38 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final (
lessThanCursor,
lessThanOrEqualCursor,
greaterThanCursor,
greaterThanOrEqualCursor,
) = await (
_lookupCursor(messagePagination?.lessThan),
_lookupCursor(messagePagination?.lessThanOrEqual),
_lookupCursor(messagePagination?.greaterThan),
_lookupCursor(messagePagination?.greaterThanOrEqual),
Comment on lines +246 to +255
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Scope cursor lookup to the active channel.

_lookupCursor resolves by id only. If a cursor id exists in another channel, it can still produce a cutoff and incorrectly filter this channel instead of behaving like a no-op. Please scope lookup by channelCid and pass cid from getMessagesByCid.

Suggested fix
-    ) = await (
-      _lookupCursor(messagePagination?.lessThan),
-      _lookupCursor(messagePagination?.lessThanOrEqual),
-      _lookupCursor(messagePagination?.greaterThan),
-      _lookupCursor(messagePagination?.greaterThanOrEqual),
+    ) = await (
+      _lookupCursor(cid, messagePagination?.lessThan),
+      _lookupCursor(cid, messagePagination?.lessThanOrEqual),
+      _lookupCursor(cid, messagePagination?.greaterThan),
+      _lookupCursor(cid, messagePagination?.greaterThanOrEqual),
     ).wait;
...
-  Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
+  Future<({DateTime createdAt, String id})?> _lookupCursor(
+    String cid,
+    String? id,
+  ) async {
     if (id == null) return null;
     final createdAt = await (selectOnly(pinnedMessages)
           ..addColumns([pinnedMessages.createdAt])
           ..where(pinnedMessages.id.equals(id))
+          ..where(pinnedMessages.channelCid.equals(cid))
           ..where(
             pinnedMessages.parentId.isNull() |
                 pinnedMessages.showInChannel.equals(true),
           ))
         .map((row) => row.read(pinnedMessages.createdAt))
         .getSingleOrNull();

Also applies to: 358-366

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart` around
lines 246 - 255, The cursor lookup currently calls
_lookupCursor(messagePagination?.lessThan/lessThanOrEqual/greaterThan/greaterThanOrEqual)
which resolves by id only and can match cursors from other channels; update
_lookupCursor to accept a cid/channelCid parameter (e.g., _lookupCursor(cid,
cursorId)) and change the calls inside getMessagesByCid to pass the active
channel's cid so lookups are scoped to that channel; also update the other
occurrence (the block around the second set of calls) to pass cid when invoking
_lookupCursor and adjust the helper implementation to filter by channelCid when
resolving cursor ids.

).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
// recent (closest to a `lessThan` cursor, or the channel tail when no
// cursor is set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
lessThanCursor == null &&
lessThanOrEqualCursor == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(pinnedMessages.createdAt),
OrderingTerm.asc(pinnedMessages.id),
]
: [
OrderingTerm.desc(pinnedMessages.createdAt),
OrderingTerm.desc(pinnedMessages.id),
];

final query = select(pinnedMessages).join([
leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)),
leftOuterJoin(
Expand All @@ -253,35 +285,48 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
..where(pinnedMessages.channelCid.equals(cid))
..where(pinnedMessages.parentId.isNull() |
pinnedMessages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]);
..orderBy(orderBy);

final rows = await query.get();
final msgList = await _messagesFromJoinRows(rows, fetchDraft: fetchDraft);
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
// fall on the correct side of the boundary. Filtering on `createdAt` alone
// would skip or repeat those siblings across pages.
if (lessThanCursor case final c?) {
query.where(
pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) |
(pinnedMessages.createdAt.equals(c.createdAt) &
pinnedMessages.id.isSmallerThanValue(c.id)),
);
}
if (lessThanOrEqualCursor case final c?) {
query.where(
pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) |
(pinnedMessages.createdAt.equals(c.createdAt) &
pinnedMessages.id.isSmallerOrEqualValue(c.id)),
);
}
if (greaterThanCursor case final c?) {
query.where(
pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) |
(pinnedMessages.createdAt.equals(c.createdAt) &
pinnedMessages.id.isBiggerThanValue(c.id)),
);
}
if (greaterThanOrEqualCursor case final c?) {
query.where(
pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) |
(pinnedMessages.createdAt.equals(c.createdAt) &
pinnedMessages.id.isBiggerOrEqualValue(c.id)),
);
}

if (msgList.isNotEmpty) {
final mutable = msgList.toList();
if (messagePagination?.lessThan != null) {
final lessThanIndex = mutable.indexWhere(
(m) => m.id == messagePagination!.lessThan,
);
if (lessThanIndex != -1) {
mutable.removeRange(lessThanIndex, mutable.length);
}
}
if (messagePagination?.greaterThan != null) {
final greaterThanIndex = mutable.indexWhere(
(m) => m.id == messagePagination!.greaterThan,
);
if (greaterThanIndex != -1) {
mutable.removeRange(0, greaterThanIndex);
}
}
if (messagePagination?.limit != null) {
return mutable.take(messagePagination!.limit).toList();
}
return mutable;
if (messagePagination != null) {
query.limit(messagePagination.limit);
}
return msgList;

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();
return _messagesFromJoinRows(orderedRows, fetchDraft: fetchDraft);
}

/// Updates the message data of a particular channel with
Expand All @@ -305,4 +350,23 @@ class PinnedMessageDao extends DatabaseAccessor<DriftChatDatabase>
(batch) => batch.insertAllOnConflictUpdate(pinnedMessages, entities),
);
}

/// Returns the `(createdAt, id)` cursor for the pinned message with [id] in
/// the local cache, or `null` if [id] is null, the message isn't cached, or
/// isn't visible in the channel (i.e. a thread reply with
/// `showInChannel = false`).
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
if (id == null) return null;
final createdAt = await (selectOnly(pinnedMessages)
..addColumns([pinnedMessages.createdAt])
..where(pinnedMessages.id.equals(id))
..where(
pinnedMessages.parentId.isNull() |
pinnedMessages.showInChannel.equals(true),
))
.map((row) => row.read(pinnedMessages.createdAt))
.getSingleOrNull();
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}
}
Loading
Loading