Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/model/account.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package model

import "strings"

// IsBotAccount reports whether an account name denotes a bot/pseudo user.
// The rule — a ".bot" suffix or a "p_" prefix — is the single source of truth
// for bot classification and is equivalent to the regex `(\.bot$|^p_)` used by
// pkg/pipelines and room-service. Subscriptions store the result in u.isBot so
// member-count reconciliation can split user vs app counts off an indexed field
// instead of evaluating a regex per document on every read.
func IsBotAccount(account string) bool {
return strings.HasSuffix(account, ".bot") || strings.HasPrefix(account, "p_")
}
34 changes: 34 additions & 0 deletions pkg/model/account_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package model_test

import (
"testing"

"github.com/hmchangw/chat/pkg/model"
)

func TestIsBotAccount(t *testing.T) {
tests := []struct {
name string
account string
want bool
}{
{name: "dot-bot suffix", account: "weather.bot", want: true},
{name: "bare dot-bot", account: ".bot", want: true},
{name: "p_ prefix webhook", account: "p_webhook", want: true},
{name: "bare p_ prefix", account: "p_", want: true},
{name: "plain human account", account: "alice", want: false},
{name: "ends with bot but no dot", account: "robot", want: false},
{name: "dot-bot not at end", account: "alice.bot.com", want: false},
{name: "p underscore not at start", account: "alice_p_x", want: false},
{name: "case sensitive prefix", account: "P_webhook", want: false},
{name: "case sensitive suffix", account: "weather.BOT", want: false},
{name: "empty", account: "", want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := model.IsBotAccount(tt.account); got != tt.want {
t.Errorf("IsBotAccount(%q) = %v, want %v", tt.account, got, tt.want)
}
})
}
}
8 changes: 8 additions & 0 deletions room-service/store_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func (s *MongoStore) EnsureIndexes(ctx context.Context) error {
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,lastSeenAt) index: %w", err)
}
// Backs room-worker's ReconcileMemberCounts, which counts bot vs non-bot
// subs per room off u.isBot — keeps both CountDocuments index-only instead
// of scanning every subscription in the room.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{Key: "roomId", Value: 1}, {Key: "u.isBot", Value: 1}},
}); err != nil {
return fmt.Errorf("ensure subscriptions (roomId,u.isBot) index: %w", err)
}
// Lookup index for FindDMSubscription (filters on u.account+name).
// Without this index, FindDMSubscription falls back to a collection scan.
if _, err := s.subscriptions.Indexes().CreateOne(ctx, mongo.IndexModel{
Expand Down
24 changes: 8 additions & 16 deletions room-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,16 +866,9 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
actualAccounts := make([]string, 0, len(needSub))
for _, c := range needSub {
user := userMap[c.Account]
sub := &model.Subscription{
ID: idgen.GenerateUUIDv7(),
User: model.SubscriptionUser{ID: user.ID, Account: user.Account},
RoomID: req.RoomID,
Name: room.Name,
RoomType: model.RoomTypeChannel,
SiteID: room.SiteID,
Roles: []model.Role{model.RoleMember},
JoinedAt: acceptedAt,
}
// newSub stamps u.isBot from the account; room is the channel fetched by
// req.RoomID so RoomType/SiteID/Name/ID all match the prior inline build.
sub := newSub(idgen.GenerateUUIDv7(), &user, room, []model.Role{model.RoleMember}, room.Name, false, acceptedAt)
// Resolve once via the shared helper so the local sub, the per-user
// SubscriptionUpdateEvent fan-out, and the cross-site MemberAddEvent
// all carry the same HistorySharedSince value.
Expand Down Expand Up @@ -1188,7 +1181,7 @@ func newSub(id string, user *model.User, room *model.Room, roles []model.Role,
name string, isSubscribed bool, joinedAt time.Time) *model.Subscription {
return &model.Subscription{
ID: id,
User: model.SubscriptionUser{ID: user.ID, Account: user.Account},
User: model.SubscriptionUser{ID: user.ID, Account: user.Account, IsBot: model.IsBotAccount(user.Account)},
RoomID: room.ID,
SiteID: room.SiteID,
Roles: roles,
Expand Down Expand Up @@ -1311,13 +1304,12 @@ func (h *Handler) processCreateRoom(ctx context.Context, data []byte) (err error
}
}

// determineRoomTypeFromPayload mirrors room-service's determineRoomType on the canonical payload.
// botPattern matches both ".bot" suffix and "p_" prefix to classify webhook-style bots
// consistently with room-service/helper.go and pkg/pipelines.
// determineRoomTypeFromPayload mirrors room-service's determineRoomType on the
// canonical payload. model.IsBotAccount classifies webhook-style bots (".bot"
// suffix or "p_" prefix) consistently with room-service/helper.go and pkg/pipelines.
func determineRoomTypeFromPayload(req *model.CreateRoomRequest) model.RoomType {
if req.Name == "" && len(req.Orgs) == 0 && len(req.Channels) == 0 && len(req.Users) == 1 {
acct := req.Users[0]
if strings.HasSuffix(acct, ".bot") || strings.HasPrefix(acct, "p_") {
if model.IsBotAccount(req.Users[0]) {
return model.RoomTypeBotDM
}
return model.RoomTypeDM
Expand Down
5 changes: 4 additions & 1 deletion room-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,11 @@ func TestReconcileMemberCountsSplitsBots(t *testing.T) {
mustInsertSub(t, db, &model.Subscription{
ID: "s3", User: model.SubscriptionUser{Account: "carol"}, RoomID: "r1",
})
// Bot subs carry u.isBot=true in production (stamped by newSub via
// model.IsBotAccount); set it explicitly here since the test inserts the
// document directly. ReconcileMemberCounts now counts off this flag.
mustInsertSub(t, db, &model.Subscription{
ID: "s4", User: model.SubscriptionUser{Account: "weather.bot"}, RoomID: "r1",
ID: "s4", User: model.SubscriptionUser{Account: "weather.bot", IsBot: true}, RoomID: "r1",
})
mustInsertRoom(t, db, &model.Room{ID: "r1", Type: model.RoomTypeChannel})

Expand Down
5 changes: 3 additions & 2 deletions room-worker/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type SubscriptionStore interface {
// ListByRoom returns all subscriptions for roomID across every site.
ListByRoom(ctx context.Context, roomID string) ([]model.Subscription, error)
// ReconcileMemberCounts recomputes Room.UserCount (non-bot subs) and
// Room.AppCount (bot subs) by scanning the subscriptions collection,
// then writes both back to the rooms collection in a single update.
// Room.AppCount (bot subs) via index-backed counts on the denormalized
// u.isBot flag, then writes both back to the rooms collection in a single
// update.
ReconcileMemberCounts(ctx context.Context, roomID string) error
GetRoom(ctx context.Context, roomID string) (*model.Room, error)
GetSubscription(ctx context.Context, account, roomID string) (*model.Subscription, error)
Expand Down
63 changes: 17 additions & 46 deletions room-worker/store_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,59 +43,30 @@ func (s *MongoStore) ListByRoom(ctx context.Context, roomID string) ([]model.Sub
return subs, nil
}

// ReconcileMemberCounts counts the room's subscriptions, splitting on
// the bot account naming pattern to produce both UserCount (non-bot) and
// AppCount (bot). A single $group aggregation does both buckets in one
// collection scan (was: two CountDocuments queries). Writes both fields
// to the rooms collection in a single updateOne. The regex must stay in
// lockstep with pkg/pipelines.GetNewMembersPipeline — both classify
// accounts matching `.bot$|^p_` as bots.
// ReconcileMemberCounts recomputes the room's AppCount (bot subs) and UserCount
// (everyone else) and writes both back in a single updateOne. AppCount is an
// index-backed CountDocuments on {roomId, u.isBot} (the flag is stamped at
// sub-creation via model.IsBotAccount) and UserCount is total minus bots — both
// counts use the index and no per-document regex runs. Deriving UserCount by
// subtraction also means legacy docs written before u.isBot existed (and any
// missing the field) correctly fall into UserCount rather than being dropped.
// Recompute-and-$set keeps the counts idempotent under JetStream redelivery.
func (s *MongoStore) ReconcileMemberCounts(ctx context.Context, roomID string) error {
const botRegex = `(\.bot$|^p_)`
pipe := []bson.M{
{"$match": bson.M{"roomId": roomID}},
{"$group": bson.M{
"_id": nil,
"appCount": bson.M{"$sum": bson.M{
"$cond": []any{
bson.M{"$regexMatch": bson.M{"input": "$u.account", "regex": botRegex}},
1, 0,
},
}},
"userCount": bson.M{"$sum": bson.M{
"$cond": []any{
bson.M{"$regexMatch": bson.M{"input": "$u.account", "regex": botRegex}},
0, 1,
},
}},
}},
}
cur, err := s.subscriptions.Aggregate(ctx, pipe)
// A transient count error must not fall through to an UpdateOne with zero
// counts, which would clobber the rooms doc.
total, err := s.subscriptions.CountDocuments(ctx, bson.M{"roomId": roomID})
if err != nil {
return fmt.Errorf("aggregate member counts: %w", err)
}
defer cur.Close(ctx)

var counts struct {
UserCount int64 `bson:"userCount"`
AppCount int64 `bson:"appCount"`
return fmt.Errorf("count subscriptions: %w", err)
}
if cur.Next(ctx) {
if err := cur.Decode(&counts); err != nil {
return fmt.Errorf("decode member counts: %w", err)
}
} else if err := cur.Err(); err != nil {
// A cursor failure must not silently fall through to an UpdateOne with
// zero counts, which would clobber the rooms doc on a transient error.
return fmt.Errorf("iterate member counts: %w", err)
appCount, err := s.subscriptions.CountDocuments(ctx, bson.M{"roomId": roomID, "u.isBot": true})
if err != nil {
return fmt.Errorf("count app subscriptions: %w", err)
}
// No rows match → both counts stay 0, which is the correct reset behavior
// for a room whose last subscription was just removed.

if _, err := s.rooms.UpdateOne(ctx, bson.M{"_id": roomID}, bson.M{
"$set": bson.M{
"userCount": counts.UserCount,
"appCount": counts.AppCount,
"userCount": total - appCount,
"appCount": appCount,
"updatedAt": time.Now().UTC(),
},
}); err != nil {
Expand Down
27 changes: 27 additions & 0 deletions room-worker/subscription_isbot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/hmchangw/chat/pkg/model"
)

// TestNewSub_SetsIsBotFromAccount asserts subscriptions are stamped with the
// bot classification at creation time, so member-count reconciliation can read
// u.isBot directly instead of re-deriving it with a per-document regex.
func TestNewSub_SetsIsBotFromAccount(t *testing.T) {
room := &model.Room{ID: "r1", SiteID: "site-a", Type: model.RoomTypeChannel}
ts := time.Now().UTC()

bot := newSub("s1", &model.User{ID: "u_bot", Account: "helper.bot"}, room, nil, "n", false, ts)
assert.True(t, bot.User.IsBot, "helper.bot must be flagged as a bot")

pseudo := newSub("s2", &model.User{ID: "u_p", Account: "p_webhook"}, room, nil, "n", false, ts)
assert.True(t, pseudo.User.IsBot, "p_ prefix must be flagged as a bot")

human := newSub("s3", &model.User{ID: "u_h", Account: "alice"}, room, nil, "n", false, ts)
assert.False(t, human.User.IsBot, "human account must not be flagged")
}
Loading