Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broadcast-worker/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion inbox-worker/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion message-gatekeeper/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion message-worker/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion notification-worker/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/natsutil/request_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package natsutil

import (
"context"
"log/slog"

"github.com/nats-io/nats.go"
)
Expand Down Expand Up @@ -57,3 +58,15 @@ func NewMsg(ctx context.Context, subj string, data []byte) *nats.Msg {
Header: HeaderForContext(ctx),
}
}

// OutboxDedupID composes a JetStream Nats-Msg-Id as base+":"+destSiteID. base
// is the X-Request-ID from ctx; falls back to payloadSeed when ctx carries no
// request ID, with a warn log so partial-deployment cases are observable.
func OutboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string {
base := RequestIDFromContext(ctx)
if base == "" {
slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID)
base = payloadSeed
}
return base + ":" + destSiteID
}
10 changes: 8 additions & 2 deletions pkg/stream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ type ConsumerSettings struct {

// DurableConsumerDefaults returns a ConsumerConfig populated from the
// supplied ConsumerSettings plus the project-wide architectural
// invariants (AckPolicy=Explicit, DeliverPolicy=New).
// invariants (AckPolicy=Explicit, DeliverPolicy=All).
//
// Callers MUST set Durable. Callers MAY set FilterSubjects to scope the
// consumer to a subset of the stream's subjects.
//
// DeliverPolicy=All so a freshly-created durable (new deploy, new site,
// or a deleted-and-recreated durable) replays the stream from the start.
// search-sync-worker's MV rebuild and inbox-worker's federated catch-up
// both depend on this; for streams with no historical data (steady-state
// new sites) All and New are equivalent.
//
// DeliverPolicy is honored only at consumer creation. Updating an
// existing durable via js.CreateOrUpdateConsumer does not reset its
// cursor position.
func DurableConsumerDefaults(s ConsumerSettings) jetstream.ConsumerConfig {
return jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
DeliverPolicy: jetstream.DeliverNewPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

AckWait: s.AckWait,
MaxDeliver: s.MaxDeliver,
MaxWaiting: s.MaxWaiting,
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestDurableConsumerDefaults(t *testing.T) {
cc := stream.DurableConsumerDefaults(s)

assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy, "AckPolicy invariant")
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy, "DeliverPolicy invariant")
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy, "DeliverPolicy invariant")
assert.Equal(t, 45*time.Second, cc.AckWait)
assert.Equal(t, 3, cc.MaxDeliver)
assert.Equal(t, 256, cc.MaxWaiting)
Expand All @@ -37,7 +37,7 @@ func TestDurableConsumerDefaults(t *testing.T) {
cc := stream.DurableConsumerDefaults(stream.ConsumerSettings{})

assert.Equal(t, jetstream.AckExplicitPolicy, cc.AckPolicy)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
assert.Zero(t, cc.AckWait)
assert.Zero(t, cc.MaxDeliver)
assert.Zero(t, cc.MaxWaiting)
Expand Down
2 changes: 1 addition & 1 deletion room-worker/consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBuildConsumerConfig(t *testing.T) {
assert.Equal(t, 30*time.Second, cc.AckWait)
assert.Equal(t, 5, cc.MaxDeliver)
assert.Equal(t, 512, cc.MaxWaiting)
assert.Equal(t, jetstream.DeliverNewPolicy, cc.DeliverPolicy)
assert.Equal(t, jetstream.DeliverAllPolicy, cc.DeliverPolicy)
})

t.Run("overrides flow through", func(t *testing.T) {
Expand Down
83 changes: 65 additions & 18 deletions room-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ func NewHandler(store SubscriptionStore, siteID string, publish PublishFunc) *Ha
return &Handler{store: store, siteID: siteID, publish: publish}
}

// outboxDedupID composes Nats-Msg-Id as base+":"+destSiteID; base is X-Request-ID from ctx, falling back to payloadSeed when absent (partial-deployment safety).
func outboxDedupID(ctx context.Context, destSiteID, payloadSeed string) string {
base := natsutil.RequestIDFromContext(ctx)
if base == "" {
slog.Warn("missing X-Request-ID; falling back to payload-derived outbox dedup base", "destSiteID", destSiteID)
base = payloadSeed
}
return base + ":" + destSiteID
}

// messageDedupSeed returns the X-Request-ID from ctx, or payloadSeed when absent (partial-deployment safety, with a warn log).
func messageDedupSeed(ctx context.Context, handler, roomID, payloadSeed string) string {
if seed := natsutil.RequestIDFromContext(ctx); seed != "" {
Expand Down Expand Up @@ -236,7 +226,7 @@ func (h *Handler) processRoleUpdate(ctx context.Context, data []byte) error {
}
outboxSubj := subject.Outbox(h.siteID, user.SiteID, "role_updated")
payloadSeed := fmt.Sprintf("%s:%s:%s:%d", req.RoomID, req.Account, req.NewRole, req.Timestamp)
dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed)
dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed)
if err := h.publish(ctx, outboxSubj, outboxData, dedupID); err != nil {
return fmt.Errorf("publish outbox: %w", err)
}
Expand Down Expand Up @@ -342,7 +332,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
}

Expand Down Expand Up @@ -388,7 +378,7 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove
}
outboxData, _ := json.Marshal(outbox)
payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.Account, req.Timestamp)
dedupID := outboxDedupID(ctx, user.SiteID, payloadSeed)
dedupID := natsutil.OutboxDedupID(ctx, user.SiteID, payloadSeed)
if err := h.publish(ctx, subject.Outbox(h.siteID, user.SiteID, "member_removed"), outboxData, dedupID); err != nil {
return fmt.Errorf("outbox publish to %s: %w", user.SiteID, err)
}
Expand Down Expand Up @@ -484,7 +474,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, outboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), inboxData, natsutil.OutboxDedupID(ctx, h.siteID, inboxSeed)); err != nil {
slog.Error("local inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
}
}
Expand Down Expand Up @@ -539,7 +529,7 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR
}
outboxData, _ := json.Marshal(outbox)
payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.OrgID, req.Timestamp)
dedupID := outboxDedupID(ctx, destSiteID, payloadSeed)
dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed)
if err := h.publish(ctx, subject.Outbox(h.siteID, destSiteID, "member_removed"), outboxData, dedupID); err != nil {
return fmt.Errorf("outbox publish to %s: %w", destSiteID, err)
}
Expand Down Expand Up @@ -769,7 +759,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
}
inboxData, _ := json.Marshal(inboxOutbox)
inboxSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, outboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil {
if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), inboxData, natsutil.OutboxDedupID(ctx, room.SiteID, inboxSeed)); err != nil {
slog.Error("local inbox member_added publish failed", "error", err, "roomID", req.RoomID)
}
}
Expand Down Expand Up @@ -830,7 +820,7 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) (err error
}
outboxData, _ := json.Marshal(outbox)
payloadSeed := fmt.Sprintf("%s:%s:%d", req.RoomID, req.RequesterAccount, req.Timestamp)
dedupID := outboxDedupID(ctx, destSiteID, payloadSeed)
dedupID := natsutil.OutboxDedupID(ctx, destSiteID, payloadSeed)
if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), outboxData, dedupID); err != nil {
return fmt.Errorf("outbox publish to %s failed: %w", destSiteID, err)
}
Expand Down Expand Up @@ -1121,6 +1111,34 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq
}
}

accounts := make([]string, 0, len(subs))
for _, sub := range subs {
accounts = append(accounts, sub.User.Account)
}
inner := model.MemberAddEvent{
Type: model.OutboxMemberAdded,
RoomID: room.ID,
RoomName: room.Name,
Accounts: accounts,
SiteID: room.SiteID,
JoinedAt: req.Timestamp,
HistorySharedSince: nil,
Timestamp: now.UnixMilli(),
}
innerData, _ := json.Marshal(inner)
outbox := model.OutboxEvent{
Type: model.OutboxMemberAdded,
SiteID: room.SiteID,
DestSiteID: room.SiteID,
Payload: innerData,
Timestamp: now.UnixMilli(),
}
outboxData, _ := json.Marshal(outbox)
payloadSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp)
if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), outboxData, natsutil.OutboxDedupID(ctx, room.SiteID, payloadSeed)); err != nil {
slog.Error("local inbox member_added publish failed", "error", err, "roomID", room.ID, "requestID", requestID)
}

// Task 37: outbox per remote site
remoteSiteAccounts := map[string][]string{}
for _, u := range allUsers {
Expand Down Expand Up @@ -1157,6 +1175,35 @@ func (h *Handler) finishCreateRoom(ctx context.Context, req *model.CreateRoomReq
if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxTypeRoomCreated), eData, requestID+":"+destSiteID); err != nil {
return fmt.Errorf("publish room_created outbox to %s: %w", destSiteID, err)
}

// Cross-site member_added so the remote site's search-sync-worker
// updates its user-room/spotlight MV — mirrors processAddMembers'
// federation. inbox-worker still consumes the room_created above to
// build correctly-typed Subscription rows; this event only feeds the
// search index.
memberEvt := model.MemberAddEvent{
Comment thread
mliu33 marked this conversation as resolved.
Type: model.OutboxMemberAdded,
RoomID: room.ID,
RoomName: room.Name,
Accounts: accounts,
SiteID: room.SiteID,
JoinedAt: req.Timestamp,
HistorySharedSince: nil,
Timestamp: now.UnixMilli(),
}
memberData, _ := json.Marshal(memberEvt)
memberEnvelope := model.OutboxEvent{
Type: model.OutboxMemberAdded,
SiteID: room.SiteID,
DestSiteID: destSiteID,
Payload: memberData,
Timestamp: now.UnixMilli(),
}
memberOutboxData, _ := json.Marshal(memberEnvelope)
memberSeed := fmt.Sprintf("%s:%s:%d", room.ID, requester.Account, req.Timestamp)
if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, model.OutboxMemberAdded), memberOutboxData, natsutil.OutboxDedupID(ctx, destSiteID, memberSeed)); err != nil {
return fmt.Errorf("publish member_added outbox to %s: %w", destSiteID, err)
}
}

return nil
Expand Down Expand Up @@ -1439,7 +1486,7 @@ func (h *Handler) publishSyncDMOutbox(ctx context.Context, room *model.Room, req
return h.publish(ctx,
subject.Outbox(room.SiteID, other.SiteID, model.OutboxTypeRoomCreated),
eData,
outboxDedupID(ctx, other.SiteID, payloadSeed),
natsutil.OutboxDedupID(ctx, other.SiteID, payloadSeed),
)
}

Expand Down
Loading
Loading