diff --git a/packages/stream_chat_persistence/CHANGELOG.md b/packages/stream_chat_persistence/CHANGELOG.md index 553792d0e..263bf65cd 100644 --- a/packages/stream_chat_persistence/CHANGELOG.md +++ b/packages/stream_chat_persistence/CHANGELOG.md @@ -4,6 +4,7 @@ - Reduce the number of DB reads in the `ChatPersistenceClient.getChannelStates` method. - Read only the messages matching the `PaginationParams` from DB when calling `MessageDao.getMessagesByCid` instead of reading all messages for the channel and applying pagination in memory. +- Read only the messages matching the `PaginationParams` from DB when calling `PinnedMessageDao.getMessagesByCid` instead of reading all pinned messages for the channel and applying pagination in memory. - Read only the reactions matching the `userId` from DB when calling `ReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Read only the reactions matching the `userId` from DB when calling `PinnedMessageReactionDao.getReactionsByUserId` instead of reading all reactions for the message and filtering in memory. - Improve the message read times from DB. @@ -13,10 +14,8 @@ - `MessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. - `MessageDao.getMessagesByCid` now treats `PaginationParams.greaterThan` as strict (exclusive of the cursor), matching the `PaginationParams` contract and the existing `lessThan` behaviour. - `MessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail — mirroring how `lessThan` already returned the messages immediately before the pivot. - - -🐛 Fixed - +- `PinnedMessageDao.getMessagesByCid` now honours `PaginationParams.lessThanOrEqual` and `PaginationParams.greaterThanOrEqual` (inclusive of the cursor message), in addition to the existing strict `lessThan`/`greaterThan`. +- `PinnedMessageDao.getMessagesByCid` with a forward cursor (`greaterThan`/`greaterThanOrEqual`) and a `limit` now returns the messages immediately AFTER the pivot, instead of the channel tail. - Fixed missing persistence of the `team` field on channel entities. 🔄 Changed diff --git a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart index 75b0f2054..dc9deafa8 100644 --- a/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart +++ b/packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart @@ -243,6 +243,38 @@ class PinnedMessageDao extends DatabaseAccessor bool fetchDraft = true, PaginationParams? messagePagination, }) async { + final ( + lessThanCursor, + lessThanOrEqualCursor, + greaterThanCursor, + greaterThanOrEqualCursor, + ) = await ( + _lookupCursor(messagePagination?.lessThan), + _lookupCursor(messagePagination?.lessThanOrEqual), + _lookupCursor(messagePagination?.greaterThan), + _lookupCursor(messagePagination?.greaterThanOrEqual), + ).wait; + + // When the caller is paginating forward (greaterThan / greaterThanOrEqual + // only), order ASC so the SQL `LIMIT` retains the N messages immediately + // AFTER the cursor. Otherwise order DESC so `LIMIT` retains the N most + // recent (closest to a `lessThan` cursor, or the channel tail when no + // cursor is set). The final result is always reshaped to ASC for display. + final isForwardPagination = + (greaterThanCursor != null || greaterThanOrEqualCursor != null) && + lessThanCursor == null && + lessThanOrEqualCursor == null; + + final orderBy = isForwardPagination + ? [ + OrderingTerm.asc(pinnedMessages.createdAt), + OrderingTerm.asc(pinnedMessages.id), + ] + : [ + OrderingTerm.desc(pinnedMessages.createdAt), + OrderingTerm.desc(pinnedMessages.id), + ]; + final query = select(pinnedMessages).join([ leftOuterJoin(_users, pinnedMessages.userId.equalsExp(_users.id)), leftOuterJoin( @@ -253,35 +285,48 @@ class PinnedMessageDao extends DatabaseAccessor ..where(pinnedMessages.channelCid.equals(cid)) ..where(pinnedMessages.parentId.isNull() | pinnedMessages.showInChannel.equals(true)) - ..orderBy([OrderingTerm.asc(pinnedMessages.createdAt)]); + ..orderBy(orderBy); - final rows = await query.get(); - final msgList = await _messagesFromJoinRows(rows, fetchDraft: fetchDraft); + // Cursor predicates compare the full `(createdAt, id)` tuple — the same + // key used in ORDER BY — so messages sharing a `createdAt` with the cursor + // fall on the correct side of the boundary. Filtering on `createdAt` alone + // would skip or repeat those siblings across pages. + if (lessThanCursor case final c?) { + query.where( + pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) | + (pinnedMessages.createdAt.equals(c.createdAt) & + pinnedMessages.id.isSmallerThanValue(c.id)), + ); + } + if (lessThanOrEqualCursor case final c?) { + query.where( + pinnedMessages.createdAt.isSmallerThanValue(c.createdAt) | + (pinnedMessages.createdAt.equals(c.createdAt) & + pinnedMessages.id.isSmallerOrEqualValue(c.id)), + ); + } + if (greaterThanCursor case final c?) { + query.where( + pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) | + (pinnedMessages.createdAt.equals(c.createdAt) & + pinnedMessages.id.isBiggerThanValue(c.id)), + ); + } + if (greaterThanOrEqualCursor case final c?) { + query.where( + pinnedMessages.createdAt.isBiggerThanValue(c.createdAt) | + (pinnedMessages.createdAt.equals(c.createdAt) & + pinnedMessages.id.isBiggerOrEqualValue(c.id)), + ); + } - if (msgList.isNotEmpty) { - final mutable = msgList.toList(); - if (messagePagination?.lessThan != null) { - final lessThanIndex = mutable.indexWhere( - (m) => m.id == messagePagination!.lessThan, - ); - if (lessThanIndex != -1) { - mutable.removeRange(lessThanIndex, mutable.length); - } - } - if (messagePagination?.greaterThan != null) { - final greaterThanIndex = mutable.indexWhere( - (m) => m.id == messagePagination!.greaterThan, - ); - if (greaterThanIndex != -1) { - mutable.removeRange(0, greaterThanIndex); - } - } - if (messagePagination?.limit != null) { - return mutable.take(messagePagination!.limit).toList(); - } - return mutable; + if (messagePagination != null) { + query.limit(messagePagination.limit); } - return msgList; + + final rows = await query.get(); + final orderedRows = isForwardPagination ? rows : rows.reversed.toList(); + return _messagesFromJoinRows(orderedRows, fetchDraft: fetchDraft); } /// Updates the message data of a particular channel with @@ -305,4 +350,23 @@ class PinnedMessageDao extends DatabaseAccessor (batch) => batch.insertAllOnConflictUpdate(pinnedMessages, entities), ); } + + /// Returns the `(createdAt, id)` cursor for the pinned message with [id] in + /// the local cache, or `null` if [id] is null, the message isn't cached, or + /// isn't visible in the channel (i.e. a thread reply with + /// `showInChannel = false`). + Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async { + if (id == null) return null; + final createdAt = await (selectOnly(pinnedMessages) + ..addColumns([pinnedMessages.createdAt]) + ..where(pinnedMessages.id.equals(id)) + ..where( + pinnedMessages.parentId.isNull() | + pinnedMessages.showInChannel.equals(true), + )) + .map((row) => row.read(pinnedMessages.createdAt)) + .getSingleOrNull(); + if (createdAt == null) return null; + return (createdAt: createdAt, id: id); + } } diff --git a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart index e86808d08..4e412de09 100644 --- a/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart +++ b/packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart @@ -25,6 +25,12 @@ void main() { }) async { final channels = [ChannelModel(cid: cid)]; final users = List.generate(count, (index) => User(id: 'testUserId$index')); + // Strictly monotonic `createdAt` per message so SQL-side pagination + // filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be + // confused by ties. Drift stores `DateTime` as integer Unix seconds by + // default, so the offset must be at least 1 second per row — otherwise + // sub-second offsets all round-trip onto the same second. + final baseTime = DateTime.now(); final messages = List.generate( count, (index) => Message( @@ -32,7 +38,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -54,7 +60,7 @@ void main() { type: 'testType', user: users[index], channelRole: 'channel_member', - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -75,7 +81,7 @@ void main() { channelRole: 'channel_member', parentId: mapAllThreadToFirstMessage ? messages[0].id : messages[index].id, - createdAt: DateTime.now(), + createdAt: baseTime.add(Duration(seconds: index)), shadowed: math.Random().nextBool(), replyCount: index, updatedAt: DateTime.now(), @@ -380,8 +386,294 @@ void main() { messagePagination: pagination, ); expect(fetchedMessages.length, limit); - expect(fetchedMessages.first.id, greaterThan); - expect(fetchedMessages.last.id != lessThan, true); + expect(fetchedMessages.first.id, 'testMessageId${cid}10'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + group('getMessagesByCid pagination', () { + const cid = 'test:Cid'; + + test('lessThan only trims messages from the end', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('greaterThan only trims messages from the start (exclusive)', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 24); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('limit only keeps the last N messages', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(limit: 15), + ); + + expect(fetchedMessages.length, 15); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('lessThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('greaterThan id not in result set is a no-op', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'missing-id', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('thread-reply id as cursor is a no-op (not visible in channel)', + () async { + // `_prepareTestData` inserts thread replies with `parentId` set and + // `showInChannel` left null — i.e. not visible in the channel query. + // Passing such an id as a cursor must resolve to a no-op so the main + // query falls back to returning the full channel slice. + await _prepareTestData(cid, count: 30, threads: true); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'testThreadMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 30); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default PaginationParams() applies implicit limit of 10', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams(), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}20'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThan returns last 10 of filtered set', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}15'); + expect(fetchedMessages.last.id, 'testMessageId${cid}24'); + }); + + test('default limit + greaterThan returns first 10 after the pivot', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThan: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}6'); + expect(fetchedMessages.last.id, 'testMessageId${cid}15'); + }); + + test('lessThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 26); + expect(fetchedMessages.first.id, 'testMessageId${cid}0'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('greaterThanOrEqual is inclusive of the pivot', () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 25); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}29'); + }); + + test('default limit + lessThanOrEqual returns the pivot and 9 before', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThanOrEqual: 'testMessageId${cid}25', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}16'); + expect(fetchedMessages.last.id, 'testMessageId${cid}25'); + }); + + test('default limit + greaterThanOrEqual returns the pivot and 9 after', + () async { + await _prepareTestData(cid, count: 30); + + final fetchedMessages = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + greaterThanOrEqual: 'testMessageId${cid}5', + ), + ); + + expect(fetchedMessages.length, 10); + expect(fetchedMessages.first.id, 'testMessageId${cid}5'); + expect(fetchedMessages.last.id, 'testMessageId${cid}14'); + }); + + test('cursor with tied createdAt does not skip or duplicate siblings', + () async { + // Three messages share an identical `createdAt`. The SQL ORDER BY uses + // the `(createdAt, id)` tuple, so within the trio the relative order is + // by id (lexicographic). A cursor at `msg_tieB` must split the trio + // cleanly: `msg_tieA` lands on the "before" side, `msg_tieC` on the + // "after" side. A `createdAt`-only WHERE predicate would collapse all + // three into the cursor's bucket and drop or keep them together. + final users = [User(id: 'tieUser')]; + await database.userDao.updateUsers(users); + await database.channelDao.updateChannels([ChannelModel(cid: cid)]); + + final tie = DateTime.now(); + final earlier = tie.subtract(const Duration(seconds: 1)); + final later = tie.add(const Duration(seconds: 1)); + + Message m(String id, DateTime t) => Message( + id: id, + user: users.first, + createdAt: t, + updatedAt: t, + text: id, + ); + + await pinnedMessageDao.updateMessages(cid, [ + m('msg_pre', earlier), + m('msg_tieA', tie), + m('msg_tieB', tie), + m('msg_tieC', tie), + m('msg_post', later), + ]); + + final before = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThan: 'msg_tieB', + ), + ); + expect(before.map((m) => m.id).toList(), ['msg_pre', 'msg_tieA']); + + final after = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThan: 'msg_tieB', + ), + ); + expect(after.map((m) => m.id).toList(), ['msg_tieC', 'msg_post']); + + final atOrBefore = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + lessThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrBefore.map((m) => m.id).toList(), + ['msg_pre', 'msg_tieA', 'msg_tieB'], + ); + + final atOrAfter = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + limit: 100, + greaterThanOrEqual: 'msg_tieB', + ), + ); + expect( + atOrAfter.map((m) => m.id).toList(), + ['msg_tieB', 'msg_tieC', 'msg_post'], + ); + }); }); test('updateMessages', () async { @@ -791,6 +1083,57 @@ void main() { expect(quoting.quotedMessage!.id, quotedId); expect(quoting.quotedMessage!.draft, isNull); }); + + test('getMessagesByCid hydrates reactions under pagination', () async { + await _seedChannel(cid); + final dbUser = User(id: 'testUserId'); + await database.userDao.updateUsers([dbUser]); + + final baseTime = DateTime.now(); + final messages = List.generate( + 30, + (i) => Message( + id: 'p-msg-$i', + user: dbUser, + text: 'msg $i', + createdAt: baseTime.add(Duration(seconds: i)), + ), + ); + await pinnedMessageDao.updateMessages(cid, messages); + + // 2 reactions per message; surviving rows after pagination must still + // carry their full reaction set. + final reactions = [ + for (final m in messages) ...[ + Reaction( + type: 'r1', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt, + ), + Reaction( + type: 'r2', + messageId: m.id, + user: dbUser, + createdAt: m.createdAt.add(const Duration(milliseconds: 1)), + ), + ], + ]; + await database.pinnedMessageReactionDao.updateReactions(reactions); + + final page = await pinnedMessageDao.getMessagesByCid( + cid, + messagePagination: const PaginationParams( + lessThan: 'p-msg-25', + ), + ); + expect(page, hasLength(10)); + for (final m in page) { + expect(m.latestReactions, hasLength(2)); + expect(m.ownReactions, hasLength(2)); + expect(m.latestReactions!.every((r) => r.messageId == m.id), isTrue); + } + }); }); tearDown(() async {