Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

105 changes: 23 additions & 82 deletions inbox-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"

"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/hmchangw/chat/pkg/idgen"
"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/natsutil"
)

// InboxStore abstracts the data store operations needed by the inbox worker.
Expand Down Expand Up @@ -61,8 +61,6 @@ func (h *Handler) HandleEvent(ctx context.Context, data []byte) error {
return h.handleSubscriptionRead(ctx, &evt)
case "thread_subscription_upserted":
return h.handleThreadSubscriptionUpserted(ctx, &evt)
case model.MessageTypeRoomCreated:
return h.handleRoomCreated(ctx, &evt)
default:
slog.Warn("unknown event type, skipping", "type", evt.Type)
return nil
Expand All @@ -75,6 +73,11 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent)
return fmt.Errorf("unmarshal member_added payload: %w", err)
}

roomType := event.RoomType
if roomType == "" {
roomType = model.RoomTypeChannel
}

users, err := h.store.FindUsersByAccounts(ctx, event.Accounts)
if err != nil {
return fmt.Errorf("find users by accounts: %w", err)
Expand All @@ -98,25 +101,28 @@ func (h *Handler) handleMemberAdded(ctx context.Context, evt *model.OutboxEvent)
slog.Warn("user not found for account", "account", account)
continue
}
// RoomType is fixed to channel: cross-site member_added events only
// originate from rooms that support add-member (channel/discussion),
// never from DM/botDM.
sub := &model.Subscription{
ID: idgen.GenerateUUIDv7(),
User: model.SubscriptionUser{ID: user.ID, Account: user.Account},
RoomID: event.RoomID,
RoomType: model.RoomTypeChannel,
RoomType: roomType,
SiteID: event.SiteID,
Roles: []model.Role{model.RoleMember},
Name: event.RoomName,
Roles: rolesForType(roomType),
Name: subscriptionName(roomType, event.RoomName, event.RequesterAccount),
IsSubscribed: subscriptionIsSubscribed(roomType, &user),
HistorySharedSince: historySharedSince,
JoinedAt: joinedAt,
}
subs = append(subs, sub)
}

if len(subs) == 0 {
return nil
}
if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil {
return fmt.Errorf("bulk create subscriptions: %w", err)
if !mongo.IsDuplicateKeyError(err) {
return fmt.Errorf("bulk create subscriptions: %w", err)
}
}

// No SubscriptionUpdateEvent is published here — room-worker already publishes
Expand Down Expand Up @@ -209,23 +215,19 @@ func (h *Handler) handleThreadSubscriptionUpserted(ctx context.Context, evt *mod
return nil
}

// errPermanent signals a non-retryable error; callers should Ack and move on.
var errPermanent = errors.New("permanent")

func rolesForType(t model.RoomType) []model.Role {
if t == model.RoomTypeChannel {
return []model.Role{model.RoleMember}
}
return nil
}

func subscriptionName(d *model.RoomCreatedOutbox, u *model.User) string {
switch d.RoomType {
func subscriptionName(roomType model.RoomType, roomName, requesterAccount string) string {
switch roomType {
case model.RoomTypeChannel, model.RoomTypeDiscussion:
return d.RoomName
return roomName
case model.RoomTypeDM, model.RoomTypeBotDM:
// On the remote site, the "other party" relative to u is the requester.
return d.RequesterAccount
return requesterAccount
}
return ""
}
Expand All @@ -236,70 +238,9 @@ func isBot(account string) bool {
return strings.HasSuffix(account, ".bot") || strings.HasPrefix(account, "p_")
}

func subscriptionIsSubscribed(d *model.RoomCreatedOutbox, u *model.User) bool {
if d.RoomType != model.RoomTypeBotDM {
func subscriptionIsSubscribed(roomType model.RoomType, u *model.User) bool {
if roomType != model.RoomTypeBotDM {
return false
}
return !isBot(u.Account)
}

func (h *Handler) handleRoomCreated(ctx context.Context, evt *model.OutboxEvent) error {
requestID := natsutil.RequestIDFromContext(ctx)
if requestID == "" {
return fmt.Errorf("missing X-Request-ID: %w", errPermanent)
}

var data model.RoomCreatedOutbox
if err := json.Unmarshal(evt.Payload, &data); err != nil {
return fmt.Errorf("unmarshal room_created payload: %w: %w", err, errPermanent)
}
if len(data.Accounts) == 0 {
slog.Warn("room_created event with empty Accounts list",
"requestId", requestID, "roomId", data.RoomID)
return nil
}

users, err := h.store.FindUsersByAccounts(ctx, data.Accounts)
if err != nil {
return fmt.Errorf("find users by accounts: %w", err)
}
// FindUsersByAccounts can return a subset; treat any account in
// data.Accounts that didn't come back as a hard failure rather than
// silently materializing partial remote-side state with no retry signal.
userByAccount := make(map[string]model.User, len(users))
for i := range users {
userByAccount[users[i].Account] = users[i]
}
for _, account := range data.Accounts {
if _, ok := userByAccount[account]; !ok {
return fmt.Errorf("find users by accounts: missing account %q (room %s home %s)",
account, data.RoomID, data.HomeSiteID)
}
}

acceptedAt := time.UnixMilli(data.Timestamp).UTC()
subs := make([]*model.Subscription, 0, len(data.Accounts))
for _, account := range data.Accounts {
u := userByAccount[account]
sub := &model.Subscription{
ID: idgen.GenerateUUIDv7(),
User: model.SubscriptionUser{ID: u.ID, Account: u.Account},
RoomID: data.RoomID,
SiteID: data.HomeSiteID,
Roles: rolesForType(data.RoomType),
Name: subscriptionName(&data, &u),
RoomType: data.RoomType,
IsSubscribed: subscriptionIsSubscribed(&data, &u),
JoinedAt: acceptedAt,
}
subs = append(subs, sub)
}

if len(subs) == 0 {
return nil
}
if err := h.store.BulkCreateSubscriptions(ctx, subs); err != nil {
return fmt.Errorf("bulk create subs: %w", err)
}
return nil
}
Loading
Loading