Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions inbox-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import (
"testing"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
natsmod "github.com/testcontainers/testcontainers-go/modules/nats"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/hmchangw/chat/pkg/model"
"github.com/hmchangw/chat/pkg/natsutil"
"github.com/hmchangw/chat/pkg/stream"
"github.com/hmchangw/chat/pkg/subject"
"github.com/hmchangw/chat/pkg/testutil"
"github.com/hmchangw/chat/pkg/testutil/testimages"
)

func setupMongo(t *testing.T) *mongo.Database {
Expand Down Expand Up @@ -408,3 +414,56 @@ func TestHandleRoomCreatedDM_PersistsRemoteCounterpartSub(t *testing.T) {
assert.Nil(t, bobSub.Roles, "DMs have no roles")
assert.False(t, bobSub.IsSubscribed, "DM does not set IsSubscribed=true")
}

// TestInboxWorker_FilterScoping_Integration verifies the consumer filters
// out the local lane: a local-lane publish stays unreachable to inbox-worker.
func TestInboxWorker_FilterScoping_Integration(t *testing.T) {
const siteID = "site-filter"

ctx := context.Background()
natsContainer, err := natsmod.Run(ctx, testimages.NATS)
require.NoError(t, err)
t.Cleanup(func() { _ = natsContainer.Terminate(ctx) })

natsURL, err := natsContainer.ConnectionString(ctx)
require.NoError(t, err)

nc, err := nats.Connect(natsURL)
require.NoError(t, err)
t.Cleanup(func() { nc.Close() })

js, err := jetstream.New(nc)
require.NoError(t, err)

inboxCfg := stream.Inbox(siteID)
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: inboxCfg.Name,
Subjects: inboxCfg.Subjects,
})
require.NoError(t, err)

cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{subject.InboxAggregateAll(siteID)},
})
require.NoError(t, err)

_, err = js.Publish(ctx, subject.InboxMemberAdded(siteID), []byte(`{"type":"member_added"}`))
require.NoError(t, err)
_, err = js.Publish(ctx, subject.InboxMemberAddedAggregate(siteID), []byte(`{"type":"member_added"}`))
require.NoError(t, err)

require.Eventually(t, func() bool {
info, err := js.Stream(ctx, inboxCfg.Name)
if err != nil {
return false
}
return info.CachedInfo().State.Msgs >= 2
}, 2*time.Second, 50*time.Millisecond, "stream must accept both publishes")

info, err := cons.Info(ctx)
require.NoError(t, err)
assert.EqualValues(t, 1, info.NumPending,
"FilterSubjects must scope inbox-worker to the aggregate.> lane only")
}
7 changes: 5 additions & 2 deletions inbox-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hmchangw/chat/pkg/otelutil"
"github.com/hmchangw/chat/pkg/shutdown"
"github.com/hmchangw/chat/pkg/stream"
"github.com/hmchangw/chat/pkg/subject"
)

type config struct {
Expand Down Expand Up @@ -210,9 +211,11 @@ func main() {

inboxCfg := stream.Inbox(cfg.SiteID)

// Local lane is reserved for search-sync-worker; scope to aggregate.> only.
cons, err := js.CreateOrUpdateConsumer(ctx, inboxCfg.Name, jetstream.ConsumerConfig{
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
Durable: "inbox-worker",
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubjects: []string{subject.InboxAggregateAll(cfg.SiteID)},
})
if err != nil {
slog.Error("create consumer failed", "error", err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/subject/subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ func InboxMemberRemovedAggregate(siteID string) string {
return fmt.Sprintf("chat.inbox.%s.aggregate.member_removed", siteID)
}

// InboxAggregateAll returns the wildcard pattern matching every federated
// (aggregate-lane) event on a site's INBOX stream:
// `chat.inbox.{siteID}.aggregate.>`. Use with
// jetstream.ConsumerConfig.FilterSubjects to scope a consumer to the
// federated lane only — excluding local-lane publishes that are reserved
// for search-sync-worker.
func InboxAggregateAll(siteID string) string {
return fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID)
}

// InboxMemberEventSubjects returns the subject filters a consumer should use
// to receive both local and federated member_added/member_removed events for
// the given site. Use with jetstream.ConsumerConfig.FilterSubjects (NATS 2.10+).
Expand Down
43 changes: 43 additions & 0 deletions room-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,21 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove
slog.Error("member event publish failed", "error", err, "roomID", req.RoomID)
}

// Wrapper Type collapses to member_removed even for self-leave so
// search-sync-worker dispatches on one MV op; inner Type is preserved.
inboxOutbox := model.OutboxEvent{
Type: "member_removed",
SiteID: h.siteID,
DestSiteID: h.siteID,
Payload: memberEvtData,
Timestamp: now.UnixMilli(),
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
}

// System message
sysMsgUser := model.SysMsgUser{
Account: user.Account,
Expand Down Expand Up @@ -469,6 +484,19 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR
if err := h.publish(ctx, subject.MemberEvent(req.RoomID), memberEvtData, ""); err != nil {
slog.Error("member event publish failed", "error", err, "roomID", req.RoomID)
}

inboxOutbox := model.OutboxEvent{
Type: "member_removed",
SiteID: h.siteID,
DestSiteID: h.siteID,
Payload: memberEvtData,
Timestamp: now.UnixMilli(),
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
}
}

// System message
Expand Down Expand Up @@ -741,6 +769,21 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
slog.Error("member add event publish failed", "error", err, "roomID", req.RoomID)
}

if len(actualAccounts) > 0 {
inboxOutbox := model.OutboxEvent{
Type: "member_added",
SiteID: room.SiteID,
DestSiteID: room.SiteID,
Payload: memberAddData,
Timestamp: now.UnixMilli(),
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, outboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil {
slog.Error("local inbox member_added publish failed", "error", err, "roomID", req.RoomID)
}
}

membersAdded := model.MembersAdded{
Individuals: actualAccounts,
Orgs: req.Orgs,
Expand Down
Loading
Loading