Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
88987e9
refactor(dao): Filter reactions by userId at DB level.
VelikovPetar May 19, 2026
fe4d4e2
refactor(dao): Filter reactions by userId at DB level (pinned messages).
VelikovPetar May 19, 2026
5652130
Merge branch 'refs/heads/master' into feature/FLU-485_optimize_read_m…
VelikovPetar May 19, 2026
466d9d3
refactor(dao): optimize message retrieval with SQL-side pagination fi…
VelikovPetar May 20, 2026
e1ec859
refactor(dao): fix formatting
VelikovPetar May 21, 2026
d38c049
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar May 21, 2026
4689606
refactor(dao): Update CHANGELOG.md
VelikovPetar May 21, 2026
edb86f7
refactor(dao): Apply formatting
VelikovPetar May 21, 2026
b580ccd
refactor(dao): enhance message pagination logic to support inclusive …
VelikovPetar May 25, 2026
52f0c26
refactor(dao): Update docs
VelikovPetar May 25, 2026
d7a18d6
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar May 25, 2026
2e719f4
refactor(dao): Add message.id tiebreakier
VelikovPetar May 25, 2026
35569a0
refactor(dao): Add message.id tiebreaker
VelikovPetar May 25, 2026
fa6898e
refactor(dao): Update CHANGELOG.md
VelikovPetar May 25, 2026
1a727ec
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar May 29, 2026
0e709eb
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar Jun 3, 2026
e49ffb7
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar Jun 5, 2026
8d408e9
Merge branch 'master' into feature/FLU-485_optimize_read_message_from_db
VelikovPetar Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
# Upcoming
## Upcoming Changes

🚀 Performance

- Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method.
- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.

✅ Added

- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.

🐞 Fixed

- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.


## 9.24.0

Expand Down
130 changes: 93 additions & 37 deletions packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:math';

import 'package:drift/drift.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
Expand Down Expand Up @@ -171,6 +169,38 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final (
lessThanCursor,
lessThanOrEqualCursor,
greaterThanCursor,
greaterThanOrEqualCursor,
) = await (
_lookupCursor(messagePagination?.lessThan),
_lookupCursor(messagePagination?.lessThanOrEqual),
_lookupCursor(messagePagination?.greaterThan),
_lookupCursor(messagePagination?.greaterThanOrEqual),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
// recent (closest to a `lessThan` cursor, or the channel tail when no
// cursor is set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCursor != null || greaterThanOrEqualCursor != null) &&
lessThanCursor == null &&
lessThanOrEqualCursor == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(messages.createdAt),
OrderingTerm.asc(messages.id),
]
: [
OrderingTerm.desc(messages.createdAt),
OrderingTerm.desc(messages.id),
];

final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
Expand All @@ -180,44 +210,52 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
])
..where(messages.channelCid.equals(cid))
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(messages.createdAt)]);
..orderBy(orderBy);

final result = await query.get();
if (result.isEmpty) return [];

final msgList = await Future.wait(
result.map(
(row) => _messageFromJoinRow(
row,
fetchDraft: fetchDraft,
),
),
);
// Cursor predicates compare the full `(createdAt, id)` tuple — the same
// key used in ORDER BY — so messages sharing a `createdAt` with the cursor
// fall on the correct side of the boundary. Filtering on `createdAt` alone
// would skip or repeat those siblings across pages.
if (lessThanCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerThanValue(c.id)),
);
}
if (lessThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isSmallerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isSmallerOrEqualValue(c.id)),
);
}
if (greaterThanCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerThanValue(c.id)),
);
}
if (greaterThanOrEqualCursor case final c?) {
query.where(
messages.createdAt.isBiggerThanValue(c.createdAt) |
(messages.createdAt.equals(c.createdAt) &
messages.id.isBiggerOrEqualValue(c.id)),
);
}

if (msgList.isNotEmpty) {
if (messagePagination?.lessThan != null) {
final lessThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.lessThan,
);
if (lessThanIndex != -1) {
msgList.removeRange(lessThanIndex, msgList.length);
}
}
if (messagePagination?.greaterThan != null) {
final greaterThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.greaterThan,
);
if (greaterThanIndex != -1) {
msgList.removeRange(0, greaterThanIndex);
}
}
if (messagePagination?.limit != null) {
return msgList
.skip(max(0, msgList.length - messagePagination!.limit))
.toList();
}
if (messagePagination != null) {
query.limit(messagePagination.limit);
}
Comment on lines +248 to 250
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only limit this when either lessThanCutoff or greaterThanCutoff is not null? I don't know if it can happen that the target message is not in the cache, but in that case limiting the amount of messages might give issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually matches the previous behaviour of the method. The limit was always applied, regardless of whether we have a lessThan/greaterThan ID supplied, and regardless of whether the corresponding message was found in the cache.

if (msgList.isNotEmpty) {
      if (messagePagination?.lessThan != null) {
        final lessThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.lessThan,
        );
        if (lessThanIndex != -1) {
          msgList.removeRange(lessThanIndex, msgList.length);
        }
      }
      if (messagePagination?.greaterThan != null) {
        final greaterThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.greaterThan,
        );
        if (greaterThanIndex != -1) {
          msgList.removeRange(0, greaterThanIndex);
        }
      }
      if (messagePagination?.limit != null) {
        return msgList
            .skip(max(0, msgList.length - messagePagination!.limit))
            .toList();
      }
    }

But now that you mentioned it, we actually have several smaller issues that neither the 'old' nor the 'new' implementation cover:

  1. Limit is used wrongly in the greaterThan path: We always take the last items from the fetch, which means that we skip the first messages which are after the greaterThan ID. IMO, we should retrieve the limit amount of messages right after the cursor
  2. We are missing handling for greaterThanOrEqual/lessThanOrEqual in the method
  3. greaterThan case returns the cursor (the pivot message) as well, when it shouldn't

Should we fix these things in this PR? Or do you think it is better to handle them in a follow up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they can be fixed in this PR. You already change the method quite a bit, so would be good to make it correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will address this!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adressed here: b580ccd

return msgList;

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();

return Future.wait(
orderedRows
.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)),
);
}

/// Updates the message data of a particular channel with
Expand All @@ -241,4 +279,22 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
(batch) => batch.insertAllOnConflictUpdate(messages, entities),
);
}

/// Returns the `(createdAt, id)` cursor for the message with [id] in the
/// local cache, or `null` if [id] is null, the message isn't cached, or
/// isn't visible in the channel (i.e. a thread reply with
/// `showInChannel = false`).
Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
if (id == null) return null;
final createdAt = await (selectOnly(messages)
..addColumns([messages.createdAt])
..where(messages.id.equals(id))
..where(
messages.parentId.isNull() | messages.showInChannel.equals(true),
))
.map((row) => row.read(messages.createdAt))
.getSingleOrNull();
if (createdAt == null) return null;
return (createdAt: createdAt, id: id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>

/// Returns all the reactions of a particular message by matching
/// [Reactions.messageId] with [messageId]
Future<List<Reaction>> getReactions(String messageId) =>
(select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(pinnedMessageReactions.messageId.equals(messageId))
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]))
.map((rows) {
final userEntity = rows.readTableOrNull(users);
final reactionEntity = rows.readTable(pinnedMessageReactions);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
Future<List<Reaction>> getReactions(String messageId) {
final where = pinnedMessageReactions.messageId.equals(messageId);
return _selectReactions(where);
}

/// Returns all the reactions of a particular message
/// added by a particular user by matching
Expand All @@ -35,9 +28,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
Future<List<Reaction>> getReactionsByUserId(
String messageId,
String userId,
) async {
final reactions = await getReactions(messageId);
return reactions.where((it) => it.userId == userId).toList();
) {
final where = pinnedMessageReactions.messageId.equals(messageId) &
pinnedMessageReactions.userId.equals(userId);
return _selectReactions(where);
}

/// Updates the reactions data with the new [reactionList] data
Expand All @@ -57,4 +51,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
(r) => r.messageId.isIn(messageIds),
);
});

Future<List<Reaction>> _selectReactions(Expression<bool> where) {
final rows = select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(where)
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]);
return rows.map((row) {
final reactionEntity = row.readTable(pinnedMessageReactions);
final userEntity = row.readTableOrNull(users);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
}
}
34 changes: 20 additions & 14 deletions packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>

/// Returns all the reactions of a particular message by matching
/// [Reactions.messageId] with [messageId]
Future<List<Reaction>> getReactions(String messageId) =>
(select(reactions).join([
leftOuterJoin(users, reactions.userId.equalsExp(users.id)),
])
..where(reactions.messageId.equals(messageId))
..orderBy([OrderingTerm.asc(reactions.createdAt)]))
.map((rows) {
final userEntity = rows.readTableOrNull(users);
final reactionEntity = rows.readTable(reactions);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
Future<List<Reaction>> getReactions(String messageId) {
final where = reactions.messageId.equals(messageId);
return _selectReactions(where);
}

/// Returns all the reactions of a particular message
/// added by a particular user by matching
Expand All @@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
Future<List<Reaction>> getReactionsByUserId(
String messageId,
String userId,
) async {
final reactions = await getReactions(messageId);
return reactions.where((it) => it.userId == userId).toList();
) {
final where =
reactions.messageId.equals(messageId) & reactions.userId.equals(userId);
return _selectReactions(where);
}

/// Updates the reactions data with the new [reactionList] data
Expand All @@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
(r) => r.messageId.isIn(messageIds),
);
});

Future<List<Reaction>> _selectReactions(Expression<bool> where) {
final rows = select(reactions)
.join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))])
..where(where)
..orderBy([OrderingTerm.asc(reactions.createdAt)]);
return rows.map((row) {
final reactionEntity = row.readTable(reactions);
final userEntity = row.readTableOrNull(users);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
}
}
Loading
Loading