Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/stream_chat_persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
## Upcoming Changes

🚀 Performance

- Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory.
- Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.
- Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory.

✅ Added

- `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`.

🐞 Fixed

- `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour.
- `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot.


## 9.24.0

- Updated `stream_chat` dependency to [`9.24.0`](https://pub.dev/packages/stream_chat/changelog).
Expand Down
118 changes: 81 additions & 37 deletions packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:math';

import 'package:drift/drift.dart';
import 'package:stream_chat/stream_chat.dart';
import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
Expand Down Expand Up @@ -171,6 +169,50 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
bool fetchDraft = true,
PaginationParams? messagePagination,
}) async {
final (
lessThanCutoff,
lessThanOrEqualCutoff,
greaterThanCutoff,
greaterThanOrEqualCutoff,
) = await (
switch (messagePagination?.lessThan) {
final id? => _lookupMessageCreatedAt(id),
_ => Future<DateTime?>.value(),
},
switch (messagePagination?.lessThanOrEqual) {
final id? => _lookupMessageCreatedAt(id),
_ => Future<DateTime?>.value(),
},
switch (messagePagination?.greaterThan) {
final id? => _lookupMessageCreatedAt(id),
_ => Future<DateTime?>.value(),
},
switch (messagePagination?.greaterThanOrEqual) {
final id? => _lookupMessageCreatedAt(id),
_ => Future<DateTime?>.value(),
},
).wait;

// When the caller is paginating forward (greaterThan / greaterThanOrEqual
// only), order ASC so the SQL `LIMIT` retains the N messages immediately
// AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most
// recent (closest to a `lessThan` cursor, or the channel tail when no
// cursor is set). The final result is always reshaped to ASC for display.
final isForwardPagination =
(greaterThanCutoff != null || greaterThanOrEqualCutoff != null) &&
lessThanCutoff == null &&
lessThanOrEqualCutoff == null;

final orderBy = isForwardPagination
? [
OrderingTerm.asc(messages.createdAt),
OrderingTerm.asc(messages.id),
]
: [
OrderingTerm.desc(messages.createdAt),
OrderingTerm.desc(messages.id),
];

final query = select(messages).join([
leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
leftOuterJoin(
Expand All @@ -180,44 +222,32 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
])
..where(messages.channelCid.equals(cid))
..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
..orderBy([OrderingTerm.asc(messages.createdAt)]);
..orderBy(orderBy);

final result = await query.get();
if (result.isEmpty) return [];

final msgList = await Future.wait(
result.map(
(row) => _messageFromJoinRow(
row,
fetchDraft: fetchDraft,
),
),
);
if (lessThanCutoff case final t?) {
query.where(messages.createdAt.isSmallerThanValue(t));
}
if (lessThanOrEqualCutoff case final t?) {
query.where(messages.createdAt.isSmallerOrEqualValue(t));
}
if (greaterThanCutoff case final t?) {
query.where(messages.createdAt.isBiggerThanValue(t));
}
if (greaterThanOrEqualCutoff case final t?) {
query.where(messages.createdAt.isBiggerOrEqualValue(t));
}

if (msgList.isNotEmpty) {
if (messagePagination?.lessThan != null) {
final lessThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.lessThan,
);
if (lessThanIndex != -1) {
msgList.removeRange(lessThanIndex, msgList.length);
}
}
if (messagePagination?.greaterThan != null) {
final greaterThanIndex = msgList.indexWhere(
(m) => m.id == messagePagination!.greaterThan,
);
if (greaterThanIndex != -1) {
msgList.removeRange(0, greaterThanIndex);
}
}
if (messagePagination?.limit != null) {
return msgList
.skip(max(0, msgList.length - messagePagination!.limit))
.toList();
}
if (messagePagination != null) {
query.limit(messagePagination.limit);
}
Comment on lines +248 to 250
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only limit this when either lessThanCutoff or greaterThanCutoff is not null? I don't know if it can happen that the target message is not in the cache, but in that case limiting the amount of messages might give issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually matches the previous behaviour of the method. The limit was always applied, regardless of whether we have a lessThan/greaterThan ID supplied, and regardless of whether the corresponding message was found in the cache.

if (msgList.isNotEmpty) {
      if (messagePagination?.lessThan != null) {
        final lessThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.lessThan,
        );
        if (lessThanIndex != -1) {
          msgList.removeRange(lessThanIndex, msgList.length);
        }
      }
      if (messagePagination?.greaterThan != null) {
        final greaterThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.greaterThan,
        );
        if (greaterThanIndex != -1) {
          msgList.removeRange(0, greaterThanIndex);
        }
      }
      if (messagePagination?.limit != null) {
        return msgList
            .skip(max(0, msgList.length - messagePagination!.limit))
            .toList();
      }
    }

But now that you mentioned it, we actually have several smaller issues that neither the 'old' nor the 'new' implementation cover:

  1. Limit is used wrongly in the greaterThan path: We always take the last items from the fetch, which means that we skip the first messages which are after the greaterThan ID. IMO, we should retrieve the limit amount of messages right after the cursor
  2. We are missing handling for greaterThanOrEqual/lessThanOrEqual in the method
  3. greaterThan case returns the cursor (the pivot message) as well, when it shouldn't

Should we fix these things in this PR? Or do you think it is better to handle them in a follow up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they can be fixed in this PR. You already change the method quite a bit, so would be good to make it correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will address this!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adressed here: b580ccd

return msgList;

final rows = await query.get();
final orderedRows = isForwardPagination ? rows : rows.reversed.toList();

return Future.wait(
orderedRows
.map((row) => _messageFromJoinRow(row, fetchDraft: fetchDraft)),
);
}

/// Updates the message data of a particular channel with
Expand All @@ -241,4 +271,18 @@ class MessageDao extends DatabaseAccessor<DriftChatDatabase>
(batch) => batch.insertAllOnConflictUpdate(messages, entities),
);
}

/// Returns the `createdAt` of the message with [id] in the local cache,
/// or `null` if the message isn't cached or isn't visible in the channel
/// (i.e. a thread reply with `showInChannel = false`).
Future<DateTime?> _lookupMessageCreatedAt(String id) {
return (selectOnly(messages)
..addColumns([messages.createdAt])
..where(messages.id.equals(id))
..where(
messages.parentId.isNull() | messages.showInChannel.equals(true),
))
.map((row) => row.read(messages.createdAt))
.getSingleOrNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>

/// Returns all the reactions of a particular message by matching
/// [Reactions.messageId] with [messageId]
Future<List<Reaction>> getReactions(String messageId) =>
(select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(pinnedMessageReactions.messageId.equals(messageId))
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]))
.map((rows) {
final userEntity = rows.readTableOrNull(users);
final reactionEntity = rows.readTable(pinnedMessageReactions);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
Future<List<Reaction>> getReactions(String messageId) {
final where = pinnedMessageReactions.messageId.equals(messageId);
return _selectReactions(where);
}

/// Returns all the reactions of a particular message
/// added by a particular user by matching
Expand All @@ -35,9 +28,10 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
Future<List<Reaction>> getReactionsByUserId(
String messageId,
String userId,
) async {
final reactions = await getReactions(messageId);
return reactions.where((it) => it.userId == userId).toList();
) {
final where = pinnedMessageReactions.messageId.equals(messageId) &
pinnedMessageReactions.userId.equals(userId);
return _selectReactions(where);
}

/// Updates the reactions data with the new [reactionList] data
Expand All @@ -57,4 +51,17 @@ class PinnedMessageReactionDao extends DatabaseAccessor<DriftChatDatabase>
(r) => r.messageId.isIn(messageIds),
);
});

Future<List<Reaction>> _selectReactions(Expression<bool> where) {
final rows = select(pinnedMessageReactions).join([
leftOuterJoin(users, pinnedMessageReactions.userId.equalsExp(users.id)),
])
..where(where)
..orderBy([OrderingTerm.asc(pinnedMessageReactions.createdAt)]);
return rows.map((row) {
final reactionEntity = row.readTable(pinnedMessageReactions);
final userEntity = row.readTableOrNull(users);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
}
}
34 changes: 20 additions & 14 deletions packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>

/// Returns all the reactions of a particular message by matching
/// [Reactions.messageId] with [messageId]
Future<List<Reaction>> getReactions(String messageId) =>
(select(reactions).join([
leftOuterJoin(users, reactions.userId.equalsExp(users.id)),
])
..where(reactions.messageId.equals(messageId))
..orderBy([OrderingTerm.asc(reactions.createdAt)]))
.map((rows) {
final userEntity = rows.readTableOrNull(users);
final reactionEntity = rows.readTable(reactions);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
Future<List<Reaction>> getReactions(String messageId) {
final where = reactions.messageId.equals(messageId);
return _selectReactions(where);
}

/// Returns all the reactions of a particular message
/// added by a particular user by matching
Expand All @@ -35,9 +28,10 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
Future<List<Reaction>> getReactionsByUserId(
String messageId,
String userId,
) async {
final reactions = await getReactions(messageId);
return reactions.where((it) => it.userId == userId).toList();
) {
final where =
reactions.messageId.equals(messageId) & reactions.userId.equals(userId);
return _selectReactions(where);
}

/// Updates the reactions data with the new [reactionList] data
Expand All @@ -57,4 +51,16 @@ class ReactionDao extends DatabaseAccessor<DriftChatDatabase>
(r) => r.messageId.isIn(messageIds),
);
});

Future<List<Reaction>> _selectReactions(Expression<bool> where) {
final rows = select(reactions)
.join([leftOuterJoin(users, reactions.userId.equalsExp(users.id))])
..where(where)
..orderBy([OrderingTerm.asc(reactions.createdAt)]);
return rows.map((row) {
final reactionEntity = row.readTable(reactions);
final userEntity = row.readTableOrNull(users);
return reactionEntity.toReaction(user: userEntity?.toUser());
}).get();
}
}
Loading
Loading