From 679ca988feb1f584575e349d86893f8e0930e2d5 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Wed, 13 May 2026 16:27:10 -0500 Subject: [PATCH 1/5] local changes --- .../stream_processing/stream_processing.go | 70 ++++ internal/driverutil/operation.go | 10 + .../streamprocessing/streamprocessing_test.go | 109 ++++++ mongo/options/streamprocessingoptions.go | 294 ++++++++++++++++ mongo/options/streamprocessingoptions_test.go | 85 +++++ mongo/streamprocessing.go | 102 ++++++ mongo/streamprocessing_processor.go | 231 +++++++++++++ mongo/streamprocessing_processors.go | 170 ++++++++++ mongo/streamprocessing_samples.go | 118 +++++++ mongo/streamprocessing_test.go | 69 ++++ streamprocessing/doc.go | 37 ++ streamprocessing/streamprocessing.go | 47 +++ .../operation/create_stream_processor.go | 228 +++++++++++++ .../driver/operation/drop_stream_processor.go | 146 ++++++++ .../get_more_sample_stream_processor.go | 212 ++++++++++++ .../driver/operation/get_stream_processor.go | 183 ++++++++++ .../operation/get_stream_processor_stats.go | 201 +++++++++++ .../start_sample_stream_processor.go | 181 ++++++++++ .../operation/start_stream_processor.go | 298 ++++++++++++++++ .../driver/operation/stop_stream_processor.go | 146 ++++++++ .../driver/operation/stream_processor_test.go | 321 ++++++++++++++++++ 21 files changed, 3258 insertions(+) create mode 100644 examples/stream_processing/stream_processing.go create mode 100644 internal/integration/streamprocessing/streamprocessing_test.go create mode 100644 mongo/options/streamprocessingoptions.go create mode 100644 mongo/options/streamprocessingoptions_test.go create mode 100644 mongo/streamprocessing.go create mode 100644 mongo/streamprocessing_processor.go create mode 100644 mongo/streamprocessing_processors.go create mode 100644 mongo/streamprocessing_samples.go create mode 100644 mongo/streamprocessing_test.go create mode 100644 streamprocessing/doc.go create mode 100644 streamprocessing/streamprocessing.go create mode 100644 x/mongo/driver/operation/create_stream_processor.go create mode 100644 x/mongo/driver/operation/drop_stream_processor.go create mode 100644 x/mongo/driver/operation/get_more_sample_stream_processor.go create mode 100644 x/mongo/driver/operation/get_stream_processor.go create mode 100644 x/mongo/driver/operation/get_stream_processor_stats.go create mode 100644 x/mongo/driver/operation/start_sample_stream_processor.go create mode 100644 x/mongo/driver/operation/start_stream_processor.go create mode 100644 x/mongo/driver/operation/stop_stream_processor.go create mode 100644 x/mongo/driver/operation/stream_processor_test.go diff --git a/examples/stream_processing/stream_processing.go b/examples/stream_processing/stream_processing.go new file mode 100644 index 0000000000..48af624ba3 --- /dev/null +++ b/examples/stream_processing/stream_processing.go @@ -0,0 +1,70 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +// Example use of the streamprocessing client. Set MONGODB_STREAM_PROCESSING_URI +// to a workspace endpoint (mongodb://atlas-stream-...) and run with: +// +// MONGODB_STREAM_PROCESSING_URI=... go run ./examples/stream_processing +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/streamprocessing" +) + +func main() { + uri := os.Getenv("MONGODB_STREAM_PROCESSING_URI") + if uri == "" { + log.Fatal("set MONGODB_STREAM_PROCESSING_URI to a workspace endpoint") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + client, err := streamprocessing.Connect(options.Client().ApplyURI(uri)) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer client.Disconnect(context.Background()) + + sps := client.StreamProcessors() + name := fmt.Sprintf("example-%d", time.Now().Unix()) + + pipeline := []bson.D{ + {{Key: "$source", Value: bson.D{{Key: "connectionName", Value: "sample_stream_solar"}}}}, + } + + if err := sps.Create(ctx, name, pipeline); err != nil { + log.Fatalf("create: %v", err) + } + defer func() { + _ = sps.Get(name).Drop(context.Background()) + }() + + sp := sps.Get(name) + if err := sp.Start(ctx, nil); err != nil { + log.Fatalf("start: %v", err) + } + + time.Sleep(2 * time.Second) + + samples, err := sp.GetStreamProcessorSamples(ctx, options.GetStreamProcessorSamples().SetLimit(5)) + if err != nil { + log.Fatalf("sample: %v", err) + } + fmt.Printf("got %d sample document(s); cursor=%d\n", len(samples.Documents), samples.CursorID) + + if err := sp.Stop(ctx); err != nil { + log.Fatalf("stop: %v", err) + } +} diff --git a/internal/driverutil/operation.go b/internal/driverutil/operation.go index 74142a56e8..e12f16ec8e 100644 --- a/internal/driverutil/operation.go +++ b/internal/driverutil/operation.go @@ -35,6 +35,16 @@ const ( ListDatabasesOp = "listDatabases" // ListDatabasesOp is the name for listing databases UpdateOp = "update" // UpdateOp is the name for updating BulkWriteOp = "bulkWrite" // BulkWriteOp is the name for client-level bulk write + + // Atlas Stream Processing commands. + CreateStreamProcessorOp = "createStreamProcessor" // CreateStreamProcessorOp is the name for creating a stream processor + StartStreamProcessorOp = "startStreamProcessor" // StartStreamProcessorOp is the name for starting a stream processor + StopStreamProcessorOp = "stopStreamProcessor" // StopStreamProcessorOp is the name for stopping a stream processor + DropStreamProcessorOp = "dropStreamProcessor" // DropStreamProcessorOp is the name for dropping a stream processor + GetStreamProcessorOp = "getStreamProcessor" // GetStreamProcessorOp is the name for fetching a stream processor's info + GetStreamProcessorStatsOp = "getStreamProcessorStats" // GetStreamProcessorStatsOp is the name for fetching a stream processor's runtime stats + StartSampleStreamProcessorOp = "startSampleStreamProcessor" // StartSampleStreamProcessorOp is the name for opening a sample cursor on a stream processor + GetMoreSampleStreamProcessorOp = "getMoreSampleStreamProcessor" // GetMoreSampleStreamProcessorOp is the name for fetching the next batch from a sample cursor ) // CalculateMaxTimeMS calculates the maxTimeMS value to send to the server diff --git a/internal/integration/streamprocessing/streamprocessing_test.go b/internal/integration/streamprocessing/streamprocessing_test.go new file mode 100644 index 0000000000..ecb2c711bf --- /dev/null +++ b/internal/integration/streamprocessing/streamprocessing_test.go @@ -0,0 +1,109 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +// Package streamprocessing_test contains env-gated integration tests for the +// streamprocessing/ client. These tests do not run unless +// MONGODB_STREAM_PROCESSING_URI is set, since they require a real Atlas +// Stream Processing workspace. +package streamprocessing_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/assert" + "go.mongodb.org/mongo-driver/v2/internal/require" + "go.mongodb.org/mongo-driver/v2/internal/uuid" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/streamprocessing" +) + +const workspaceURIEnv = "MONGODB_STREAM_PROCESSING_URI" + +func skipIfNoWorkspaceURI(t *testing.T) string { + t.Helper() + uri := os.Getenv(workspaceURIEnv) + if uri == "" { + t.Skipf("%s is not set; skipping ASP integration test", workspaceURIEnv) + } + return uri +} + +func newClient(t *testing.T, uri string) *streamprocessing.Client { + t.Helper() + c, err := streamprocessing.Connect(options.Client().ApplyURI(uri)) + require.NoError(t, err) + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _ = c.Disconnect(ctx) + }) + return c +} + +func uniqueProcessorName(t *testing.T) string { + t.Helper() + id, err := uuid.New() + require.NoError(t, err) + return fmt.Sprintf("driver-test-%s", id) +} + +// TestEndToEnd runs the full lifecycle of a stream processor against a real +// workspace endpoint. +func TestEndToEnd(t *testing.T) { + uri := skipIfNoWorkspaceURI(t) + client := newClient(t, uri) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + sps := client.StreamProcessors() + name := uniqueProcessorName(t) + t.Logf("using processor name %q", name) + + pipeline := []bson.D{ + {{Key: "$source", Value: bson.D{{Key: "connectionName", Value: "sample_stream_solar"}}}}, + } + + require.NoError(t, sps.Create(ctx, name, pipeline)) + t.Cleanup(func() { + dropCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _ = sps.Get(name).Drop(dropCtx) + }) + + info, err := sps.GetInfo(ctx, name) + require.NoError(t, err) + assert.Equal(t, name, info.Name) + + sp := sps.Get(name) + require.NoError(t, sp.Start(ctx, nil)) + + // Wait briefly for the processor to start producing samples. + time.Sleep(2 * time.Second) + + stats, err := sp.Stats(ctx, nil) + require.NoError(t, err) + require.Greater(t, len(stats), 0) + + samples, err := sp.GetStreamProcessorSamples(ctx, options.GetStreamProcessorSamples().SetLimit(10)) + require.NoError(t, err) + t.Logf("first sample batch: cursor=%d documents=%d", samples.CursorID, len(samples.Documents)) + + if samples.CursorID != 0 { + next, err := sp.GetStreamProcessorSamples(ctx, + options.GetStreamProcessorSamples().SetCursorID(samples.CursorID).SetBatchSize(5)) + require.NoError(t, err) + t.Logf("second sample batch: cursor=%d documents=%d", next.CursorID, len(next.Documents)) + } + + require.NoError(t, sp.Stop(ctx)) + require.NoError(t, sp.Drop(ctx)) +} diff --git a/mongo/options/streamprocessingoptions.go b/mongo/options/streamprocessingoptions.go new file mode 100644 index 0000000000..e30ef83f90 --- /dev/null +++ b/mongo/options/streamprocessingoptions.go @@ -0,0 +1,294 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package options + +import "go.mongodb.org/mongo-driver/v2/bson" + +// CreateStreamProcessorOptions represents arguments that can be used to +// configure a CreateStreamProcessor operation. +// +// See corresponding setter methods for documentation. +type CreateStreamProcessorOptions struct { + DLQ bson.Raw + StreamMetaFieldName *string + Tier *string + FailoverEnabled *bool +} + +// CreateStreamProcessorOptionsBuilder configures the createStreamProcessor command. +type CreateStreamProcessorOptionsBuilder struct { + Opts []func(*CreateStreamProcessorOptions) error +} + +// CreateStreamProcessor creates a new CreateStreamProcessorOptionsBuilder. +func CreateStreamProcessor() *CreateStreamProcessorOptionsBuilder { + return &CreateStreamProcessorOptionsBuilder{} +} + +// List returns a list of CreateStreamProcessorOptions setter functions. +func (c *CreateStreamProcessorOptionsBuilder) List() []func(*CreateStreamProcessorOptions) error { + return c.Opts +} + +// SetDLQ sets the dead-letter-queue configuration document. +func (c *CreateStreamProcessorOptionsBuilder) SetDLQ(dlq bson.Raw) *CreateStreamProcessorOptionsBuilder { + c.Opts = append(c.Opts, func(o *CreateStreamProcessorOptions) error { + o.DLQ = dlq + return nil + }) + return c +} + +// SetStreamMetaFieldName sets the field name to use for stream metadata. +func (c *CreateStreamProcessorOptionsBuilder) SetStreamMetaFieldName(name string) *CreateStreamProcessorOptionsBuilder { + c.Opts = append(c.Opts, func(o *CreateStreamProcessorOptions) error { + o.StreamMetaFieldName = &name + return nil + }) + return c +} + +// SetTier sets the compute tier for the new processor. +func (c *CreateStreamProcessorOptionsBuilder) SetTier(tier string) *CreateStreamProcessorOptionsBuilder { + c.Opts = append(c.Opts, func(o *CreateStreamProcessorOptions) error { + o.Tier = &tier + return nil + }) + return c +} + +// SetFailoverEnabled sets whether failover should be enabled. +func (c *CreateStreamProcessorOptionsBuilder) SetFailoverEnabled(b bool) *CreateStreamProcessorOptionsBuilder { + c.Opts = append(c.Opts, func(o *CreateStreamProcessorOptions) error { + o.FailoverEnabled = &b + return nil + }) + return c +} + +// StreamProcessorFailoverOptions describes a failover request attached to a +// startStreamProcessor command. +type StreamProcessorFailoverOptions struct { + Region string // required when failover is sent + Mode *string // "GRACEFUL" (default) or "FORCED" + DryRun *bool +} + +// StreamProcessorFailoverOptionsBuilder builds a StreamProcessorFailoverOptions value. +type StreamProcessorFailoverOptionsBuilder struct { + Opts []func(*StreamProcessorFailoverOptions) error +} + +// StreamProcessorFailover creates a new StreamProcessorFailoverOptionsBuilder. +func StreamProcessorFailover() *StreamProcessorFailoverOptionsBuilder { + return &StreamProcessorFailoverOptionsBuilder{} +} + +// List returns a list of StreamProcessorFailoverOptions setter functions. +func (b *StreamProcessorFailoverOptionsBuilder) List() []func(*StreamProcessorFailoverOptions) error { + return b.Opts +} + +// SetRegion sets the target region for failover. Required when failover is sent. +func (b *StreamProcessorFailoverOptionsBuilder) SetRegion(region string) *StreamProcessorFailoverOptionsBuilder { + b.Opts = append(b.Opts, func(o *StreamProcessorFailoverOptions) error { + o.Region = region + return nil + }) + return b +} + +// SetMode sets the failover mode. Valid values: "GRACEFUL" (default), "FORCED". +func (b *StreamProcessorFailoverOptionsBuilder) SetMode(mode string) *StreamProcessorFailoverOptionsBuilder { + b.Opts = append(b.Opts, func(o *StreamProcessorFailoverOptions) error { + o.Mode = &mode + return nil + }) + return b +} + +// SetDryRun configures whether the failover request should be validated +// without being executed. +func (b *StreamProcessorFailoverOptionsBuilder) SetDryRun(v bool) *StreamProcessorFailoverOptionsBuilder { + b.Opts = append(b.Opts, func(o *StreamProcessorFailoverOptions) error { + o.DryRun = &v + return nil + }) + return b +} + +// StartStreamProcessorOptions represents arguments that can be used to +// configure a StartStreamProcessor operation. +// +// The spec reserves a StartAfter field for future use; drivers MUST NOT send +// it to the wire today. It is intentionally absent from this struct. +type StartStreamProcessorOptions struct { + Workers *int32 + ClearCheckpoints *bool + StartAtOperationTime *bson.Timestamp + Tier *string + EnableAutoScaling *bool + Failover *StreamProcessorFailoverOptions +} + +// StartStreamProcessorOptionsBuilder configures the startStreamProcessor command. +type StartStreamProcessorOptionsBuilder struct { + Opts []func(*StartStreamProcessorOptions) error +} + +// StartStreamProcessor creates a new StartStreamProcessorOptionsBuilder. +func StartStreamProcessor() *StartStreamProcessorOptionsBuilder { + return &StartStreamProcessorOptionsBuilder{} +} + +// List returns a list of StartStreamProcessorOptions setter functions. +func (s *StartStreamProcessorOptionsBuilder) List() []func(*StartStreamProcessorOptions) error { + return s.Opts +} + +// SetWorkers sets the workers field on the command. +func (s *StartStreamProcessorOptionsBuilder) SetWorkers(n int32) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.Workers = &n + return nil + }) + return s +} + +// SetClearCheckpoints sets the options.clearCheckpoints flag. +func (s *StartStreamProcessorOptionsBuilder) SetClearCheckpoints(b bool) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.ClearCheckpoints = &b + return nil + }) + return s +} + +// SetStartAtOperationTime sets the options.startAtOperationTime BSON timestamp. +func (s *StartStreamProcessorOptionsBuilder) SetStartAtOperationTime(ts bson.Timestamp) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.StartAtOperationTime = &ts + return nil + }) + return s +} + +// SetTier sets the options.tier value. Valid values: "SP2", "SP5", "SP10", +// "SP30", "SP50". +func (s *StartStreamProcessorOptionsBuilder) SetTier(tier string) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.Tier = &tier + return nil + }) + return s +} + +// SetEnableAutoScaling sets the options.enableAutoScaling flag. +func (s *StartStreamProcessorOptionsBuilder) SetEnableAutoScaling(b bool) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.EnableAutoScaling = &b + return nil + }) + return s +} + +// SetFailover attaches a failover sub-document to the command. +func (s *StartStreamProcessorOptionsBuilder) SetFailover(f *StreamProcessorFailoverOptions) *StartStreamProcessorOptionsBuilder { + s.Opts = append(s.Opts, func(o *StartStreamProcessorOptions) error { + o.Failover = f + return nil + }) + return s +} + +// GetStreamProcessorStatsOptions represents arguments that can be used to +// configure a GetStreamProcessorStats operation. +type GetStreamProcessorStatsOptions struct { + Verbose *bool +} + +// GetStreamProcessorStatsOptionsBuilder configures the getStreamProcessorStats command. +type GetStreamProcessorStatsOptionsBuilder struct { + Opts []func(*GetStreamProcessorStatsOptions) error +} + +// GetStreamProcessorStats creates a new GetStreamProcessorStatsOptionsBuilder. +func GetStreamProcessorStats() *GetStreamProcessorStatsOptionsBuilder { + return &GetStreamProcessorStatsOptionsBuilder{} +} + +// List returns a list of GetStreamProcessorStatsOptions setter functions. +func (g *GetStreamProcessorStatsOptionsBuilder) List() []func(*GetStreamProcessorStatsOptions) error { + return g.Opts +} + +// SetVerbose sets the options.verbose flag. +func (g *GetStreamProcessorStatsOptionsBuilder) SetVerbose(b bool) *GetStreamProcessorStatsOptionsBuilder { + g.Opts = append(g.Opts, func(o *GetStreamProcessorStatsOptions) error { + o.Verbose = &b + return nil + }) + return g +} + +// GetStreamProcessorSamplesOptions represents arguments that can be used to +// configure a GetStreamProcessorSamples call. +// +// If CursorID is absent or zero, a new sample cursor is opened via +// startSampleStreamProcessor (and a Limit may be provided). If non-zero, the +// next batch is fetched via getMoreSampleStreamProcessor (and a BatchSize may +// be provided). +type GetStreamProcessorSamplesOptions struct { + CursorID *int64 + Limit *int32 + BatchSize *int32 +} + +// GetStreamProcessorSamplesOptionsBuilder configures a GetStreamProcessorSamples call. +type GetStreamProcessorSamplesOptionsBuilder struct { + Opts []func(*GetStreamProcessorSamplesOptions) error +} + +// GetStreamProcessorSamples creates a new GetStreamProcessorSamplesOptionsBuilder. +func GetStreamProcessorSamples() *GetStreamProcessorSamplesOptionsBuilder { + return &GetStreamProcessorSamplesOptionsBuilder{} +} + +// List returns a list of GetStreamProcessorSamplesOptions setter functions. +func (g *GetStreamProcessorSamplesOptionsBuilder) List() []func(*GetStreamProcessorSamplesOptions) error { + return g.Opts +} + +// SetCursorID sets the cursor ID from a previous call. If absent or zero, a +// new sample cursor is opened. +func (g *GetStreamProcessorSamplesOptionsBuilder) SetCursorID(id int64) *GetStreamProcessorSamplesOptionsBuilder { + g.Opts = append(g.Opts, func(o *GetStreamProcessorSamplesOptions) error { + o.CursorID = &id + return nil + }) + return g +} + +// SetLimit sets the maximum number of documents to sample. Only sent on the +// initial call (when CursorID is absent or zero); ignored on subsequent calls. +func (g *GetStreamProcessorSamplesOptionsBuilder) SetLimit(n int32) *GetStreamProcessorSamplesOptionsBuilder { + g.Opts = append(g.Opts, func(o *GetStreamProcessorSamplesOptions) error { + o.Limit = &n + return nil + }) + return g +} + +// SetBatchSize sets the desired batch size. Only sent on subsequent calls +// (when CursorID is non-zero); ignored on the initial call. +func (g *GetStreamProcessorSamplesOptionsBuilder) SetBatchSize(n int32) *GetStreamProcessorSamplesOptionsBuilder { + g.Opts = append(g.Opts, func(o *GetStreamProcessorSamplesOptions) error { + o.BatchSize = &n + return nil + }) + return g +} diff --git a/mongo/options/streamprocessingoptions_test.go b/mongo/options/streamprocessingoptions_test.go new file mode 100644 index 0000000000..491587b3dc --- /dev/null +++ b/mongo/options/streamprocessingoptions_test.go @@ -0,0 +1,85 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package options_test + +import ( + "testing" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/assert" + "go.mongodb.org/mongo-driver/v2/internal/mongoutil" + "go.mongodb.org/mongo-driver/v2/internal/require" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +func TestCreateStreamProcessorOptions_Builder(t *testing.T) { + dlq := bson.Raw([]byte{0x05, 0x00, 0x00, 0x00, 0x00}) // empty BSON doc + b := options.CreateStreamProcessor(). + SetDLQ(dlq). + SetStreamMetaFieldName("_meta"). + SetTier("SP10"). + SetFailoverEnabled(true) + + args, err := mongoutil.NewOptions[options.CreateStreamProcessorOptions](b) + require.NoError(t, err) + assert.Equal(t, dlq, args.DLQ) + require.NotNil(t, args.StreamMetaFieldName) + assert.Equal(t, "_meta", *args.StreamMetaFieldName) + require.NotNil(t, args.Tier) + assert.Equal(t, "SP10", *args.Tier) + require.NotNil(t, args.FailoverEnabled) + assert.True(t, *args.FailoverEnabled) +} + +func TestStartStreamProcessorOptions_Builder(t *testing.T) { + fo := options.StreamProcessorFailover().SetRegion("us-east-1").SetMode("FORCED").SetDryRun(true) + foArgs, err := mongoutil.NewOptions[options.StreamProcessorFailoverOptions](fo) + require.NoError(t, err) + + b := options.StartStreamProcessor(). + SetWorkers(3). + SetClearCheckpoints(true). + SetStartAtOperationTime(bson.Timestamp{T: 100, I: 1}). + SetTier("SP30"). + SetEnableAutoScaling(true). + SetFailover(foArgs) + + args, err := mongoutil.NewOptions[options.StartStreamProcessorOptions](b) + require.NoError(t, err) + require.NotNil(t, args.Workers) + assert.Equal(t, int32(3), *args.Workers) + require.NotNil(t, args.ClearCheckpoints) + assert.True(t, *args.ClearCheckpoints) + require.NotNil(t, args.StartAtOperationTime) + assert.Equal(t, uint32(100), args.StartAtOperationTime.T) + require.NotNil(t, args.Tier) + assert.Equal(t, "SP30", *args.Tier) + require.NotNil(t, args.EnableAutoScaling) + assert.True(t, *args.EnableAutoScaling) + require.NotNil(t, args.Failover) + assert.Equal(t, "us-east-1", args.Failover.Region) +} + +func TestGetStreamProcessorStatsOptions_Builder(t *testing.T) { + b := options.GetStreamProcessorStats().SetVerbose(true) + args, err := mongoutil.NewOptions[options.GetStreamProcessorStatsOptions](b) + require.NoError(t, err) + require.NotNil(t, args.Verbose) + assert.True(t, *args.Verbose) +} + +func TestGetStreamProcessorSamplesOptions_Builder(t *testing.T) { + b := options.GetStreamProcessorSamples().SetCursorID(42).SetLimit(100).SetBatchSize(20) + args, err := mongoutil.NewOptions[options.GetStreamProcessorSamplesOptions](b) + require.NoError(t, err) + require.NotNil(t, args.CursorID) + assert.Equal(t, int64(42), *args.CursorID) + require.NotNil(t, args.Limit) + assert.Equal(t, int32(100), *args.Limit) + require.NotNil(t, args.BatchSize) + assert.Equal(t, int32(20), *args.BatchSize) +} diff --git a/mongo/streamprocessing.go b/mongo/streamprocessing.go new file mode 100644 index 0000000000..134e883965 --- /dev/null +++ b/mongo/streamprocessing.go @@ -0,0 +1,102 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + "crypto/tls" + "strings" + + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" +) + +// streamProcessingAdminDB is the database name used for routing all Atlas +// Stream Processing commands. Workspace endpoint routing is performed at the +// network layer; the database name is not used as a routing key, but the +// driver sends commands against "admin" for consistency with mongosh and the +// authentication database. +const streamProcessingAdminDB = "admin" + +// StreamProcessingClient is a handle for connecting to an Atlas Stream +// Processing workspace. It exposes the commands defined by the Atlas Stream +// Processing wire protocol. +// +// A StreamProcessingClient is distinct from a Client: it enforces TLS, +// defaults authSource to "admin", and exposes only commands meaningful for +// stream processing workspaces. +// +// Most callers should obtain a StreamProcessingClient via the +// streamprocessing.Connect factory, which re-exports this type. +type StreamProcessingClient struct { + client *Client +} + +// ConnectStreamProcessing creates a new StreamProcessingClient configured for +// an Atlas Stream Processing workspace endpoint. +// +// ConnectStreamProcessing enforces the following workspace requirements: +// +// - TLS is always enabled. If the caller did not configure a TLS config, +// a default *tls.Config is set. +// - authSource defaults to "admin" when credentials are provided without +// an explicit auth source. +// +// Like [Connect], ConnectStreamProcessing returns without verifying that the +// workspace is reachable; call Ping to verify connectivity. +func ConnectStreamProcessing(opts ...*options.ClientOptions) (*StreamProcessingClient, error) { + merged := options.MergeClientOptions(opts...) + applyStreamProcessingDefaults(merged) + + c, err := Connect(merged) + if err != nil { + return nil, err + } + return &StreamProcessingClient{client: c}, nil +} + +// applyStreamProcessingDefaults enforces TLS and authSource=admin on the +// merged ClientOptions. +func applyStreamProcessingDefaults(opts *options.ClientOptions) { + if opts.TLSConfig == nil { + opts.TLSConfig = &tls.Config{} + } + if opts.Auth != nil && opts.Auth.AuthSource == "" { + opts.Auth.AuthSource = streamProcessingAdminDB + } +} + +// IsStreamProcessingHost reports whether the provided hostname looks like an +// Atlas Stream Processing workspace endpoint. Detection is advisory only; the +// driver does not require it. +func IsStreamProcessingHost(host string) bool { + return strings.HasPrefix(host, "atlas-stream-") && + strings.HasSuffix(host, ".a.query.mongodb.net") +} + +// Client returns the underlying *Client used by this StreamProcessingClient. +// This is exposed for advanced cases (e.g. running unsupported commands via +// RunCommand); ordinary code should use StreamProcessors instead. +func (spc *StreamProcessingClient) Client() *Client { return spc.client } + +// StreamProcessors returns a handle for managing stream processors in this +// workspace. +func (spc *StreamProcessingClient) StreamProcessors() *StreamProcessors { + return &StreamProcessors{client: spc.client} +} + +// Disconnect closes the underlying connections, waiting for in-flight +// operations to complete. After Disconnect, the StreamProcessingClient must +// not be used. +func (spc *StreamProcessingClient) Disconnect(ctx context.Context) error { + return spc.client.Disconnect(ctx) +} + +// Ping verifies that the workspace endpoint is reachable. +func (spc *StreamProcessingClient) Ping(ctx context.Context, rp *readpref.ReadPref) error { + return spc.client.Ping(ctx, rp) +} diff --git a/mongo/streamprocessing_processor.go b/mongo/streamprocessing_processor.go new file mode 100644 index 0000000000..40659de9c4 --- /dev/null +++ b/mongo/streamprocessing_processor.go @@ -0,0 +1,231 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + "errors" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/mongoutil" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" +) + +// StreamProcessor is a handle for a specific named stream processor. Holding +// a StreamProcessor does not imply that a processor with the given name +// currently exists on the server. +type StreamProcessor struct { + parent *StreamProcessors + name string +} + +// Name returns the processor name. +func (sp *StreamProcessor) Name() string { return sp.name } + +// Start issues a startStreamProcessor command. The server requires the +// processor to be in the STOPPED or FAILED state. +func (sp *StreamProcessor) Start( + ctx context.Context, + opts ...options.Lister[options.StartStreamProcessorOptions], +) error { + if ctx == nil { + ctx = context.Background() + } + + args, err := mongoutil.NewOptions[options.StartStreamProcessorOptions](opts...) + if err != nil { + return err + } + + sess, release, err := sp.parent.acquireSession(ctx) + if err != nil { + return err + } + defer release() + + op := operation.NewStartStreamProcessor(sp.name) + if args.Workers != nil { + op = op.Workers(*args.Workers) + } + if args.ClearCheckpoints != nil { + op = op.ClearCheckpoints(*args.ClearCheckpoints) + } + if args.StartAtOperationTime != nil { + op = op.StartAtOperationTime(args.StartAtOperationTime.T, args.StartAtOperationTime.I) + } + if args.Tier != nil { + op = op.Tier(*args.Tier) + } + if args.EnableAutoScaling != nil { + op = op.EnableAutoScaling(*args.EnableAutoScaling) + } + if args.Failover != nil { + if args.Failover.Region == "" { + return errors.New("failover requires a target region") + } + op = op.FailoverRegion(args.Failover.Region) + if args.Failover.Mode != nil { + op = op.FailoverMode(*args.Failover.Mode) + } + if args.Failover.DryRun != nil { + op = op.FailoverDryRun(*args.Failover.DryRun) + } + } + + op = op. + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.writeSelector()). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator) + + return op.Execute(ctx) +} + +// Stop issues a stopStreamProcessor command. The processor remains in a +// STOPPED state and can be restarted. +func (sp *StreamProcessor) Stop(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + sess, release, err := sp.parent.acquireSession(ctx) + if err != nil { + return err + } + defer release() + + op := operation.NewStopStreamProcessor(sp.name). + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.writeSelector()). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator) + + return op.Execute(ctx) +} + +// Drop issues a dropStreamProcessor command. A dropped processor cannot be +// recovered. +func (sp *StreamProcessor) Drop(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + sess, release, err := sp.parent.acquireSession(ctx) + if err != nil { + return err + } + defer release() + + op := operation.NewDropStreamProcessor(sp.name). + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.writeSelector()). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator) + + return op.Execute(ctx) +} + +// Stats issues a getStreamProcessorStats command and returns the raw response +// document. The server returns an error if the processor is not running. +func (sp *StreamProcessor) Stats( + ctx context.Context, + opts ...options.Lister[options.GetStreamProcessorStatsOptions], +) (bson.Raw, error) { + if ctx == nil { + ctx = context.Background() + } + + args, err := mongoutil.NewOptions[options.GetStreamProcessorStatsOptions](opts...) + if err != nil { + return nil, err + } + + sess, release, err := sp.parent.acquireSession(ctx) + if err != nil { + return nil, err + } + defer release() + + op := operation.NewGetStreamProcessorStats(sp.name) + if args.Verbose != nil { + op = op.Verbose(*args.Verbose) + } + op = op. + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.readSelector()). + ReadPreference(sp.parent.client.readPreference). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator). + Retry(driver.RetryOncePerCommand) + + if err := op.Execute(ctx); err != nil { + return nil, err + } + return bson.Raw(op.Result()), nil +} + +// StreamProcessorInfo describes a single stream processor as returned by +// getStreamProcessor. +// +// Server-internal fields (tenantID, projectId, processorId, …) are not +// surfaced. Unknown fields are tolerated and ignored. +type StreamProcessorInfo struct { + ID string `bson:"id"` + Name string `bson:"name"` + State string `bson:"state"` + Pipeline []bson.Raw `bson:"pipeline"` + PipelineVersion int32 `bson:"pipelineVersion"` + Tier string `bson:"tier,omitempty"` + DLQ bson.Raw `bson:"dlq,omitempty"` + StreamMetaFieldName string `bson:"streamMetaFieldName,omitempty"` + EnableAutoScaling bool `bson:"enableAutoScaling"` + FailoverEnabled bool `bson:"failoverEnabled"` + ActiveRegion string `bson:"activeRegion"` + WorkspaceDefaultRegion string `bson:"workspaceDefaultRegion"` + LastStateChange *time.Time `bson:"lastStateChange,omitempty"` + LastModifiedAt *time.Time `bson:"lastModifiedAt,omitempty"` + ModifiedBy string `bson:"modifiedBy"` + HasStarted bool `bson:"hasStarted"` + ErrorMsg string `bson:"errorMsg"` + ErrorRetryable bool `bson:"errorRetryable"` + ErrorCode *int32 `bson:"errorCode,omitempty"` +} + +func parseStreamProcessorInfo(raw bson.Raw, bsonOpts *options.BSONOptions, reg *bson.Registry) (*StreamProcessorInfo, error) { + if len(raw) == 0 { + return nil, errors.New("empty getStreamProcessor response") + } + info := new(StreamProcessorInfo) + dec := getDecoder(raw, bsonOpts, reg) + if err := dec.Decode(info); err != nil { + return nil, err + } + return info, nil +} diff --git a/mongo/streamprocessing_processors.go b/mongo/streamprocessing_processors.go new file mode 100644 index 0000000000..a8ba12f393 --- /dev/null +++ b/mongo/streamprocessing_processors.go @@ -0,0 +1,170 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/mongoutil" + "go.mongodb.org/mongo-driver/v2/internal/serverselector" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// StreamProcessors is a handle for managing stream processors in a workspace. +type StreamProcessors struct { + client *Client +} + +// Create issues a createStreamProcessor command for a new processor with the +// given name and aggregation pipeline. +// +// The pipeline must be an order-preserving Go value (e.g. mongo.Pipeline, +// []bson.D, []bson.Raw). Map types are rejected. +func (sps *StreamProcessors) Create( + ctx context.Context, + name string, + pipeline any, + opts ...options.Lister[options.CreateStreamProcessorOptions], +) error { + if ctx == nil { + ctx = context.Background() + } + if name == "" { + return errors.New("stream processor name must not be empty") + } + + pipelineArr, _, err := marshalAggregatePipeline(pipeline, sps.client.bsonOpts, sps.client.registry) + if err != nil { + return err + } + + args, err := mongoutil.NewOptions[options.CreateStreamProcessorOptions](opts...) + if err != nil { + return err + } + + sess, releaseSession, err := sps.acquireSession(ctx) + if err != nil { + return err + } + defer releaseSession() + + op := operation.NewCreateStreamProcessor(name, bsoncore.Document(pipelineArr)) + if args.DLQ != nil { + op = op.DLQ(bsoncore.Document(args.DLQ)) + } + if args.StreamMetaFieldName != nil { + op = op.StreamMetaFieldName(*args.StreamMetaFieldName) + } + if args.Tier != nil { + op = op.Tier(*args.Tier) + } + if args.FailoverEnabled != nil { + op = op.FailoverEnabled(*args.FailoverEnabled) + } + + op = op. + Session(sess). + ClusterClock(sps.client.clock). + CommandMonitor(sps.client.monitor). + Crypt(sps.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sps.client.deployment). + ServerSelector(sps.writeSelector()). + ServerAPI(sps.client.serverAPI). + Authenticator(sps.client.authenticator) + + return op.Execute(ctx) +} + +// Get returns a handle for an existing stream processor by name. Get does not +// verify that a processor with this name exists on the server. +func (sps *StreamProcessors) Get(name string) *StreamProcessor { + return &StreamProcessor{parent: sps, name: name} +} + +// GetInfo issues a getStreamProcessor command for the processor with the +// given name. +func (sps *StreamProcessors) GetInfo(ctx context.Context, name string) (*StreamProcessorInfo, error) { + if ctx == nil { + ctx = context.Background() + } + if name == "" { + return nil, errors.New("stream processor name must not be empty") + } + + sess, releaseSession, err := sps.acquireSession(ctx) + if err != nil { + return nil, err + } + defer releaseSession() + + op := operation.NewGetStreamProcessor(name). + Session(sess). + ClusterClock(sps.client.clock). + CommandMonitor(sps.client.monitor). + Crypt(sps.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sps.client.deployment). + ServerSelector(sps.readSelector()). + ReadPreference(sps.client.readPreference). + ServerAPI(sps.client.serverAPI). + Authenticator(sps.client.authenticator). + Retry(driver.RetryOncePerCommand) + + if err := op.Execute(ctx); err != nil { + return nil, err + } + return parseStreamProcessorInfo(bson.Raw(op.Result()), sps.client.bsonOpts, sps.client.registry) +} + +// acquireSession returns either the explicit session from the context or an +// implicit session from the client's session pool. The returned release +// callback ends the implicit session (if any). +func (sps *StreamProcessors) acquireSession(ctx context.Context) (*session.Client, func(), error) { + sess := sessionFromContext(ctx) + implicit := false + if sess == nil && sps.client.sessionPool != nil { + sess = session.NewImplicitClientSession(sps.client.sessionPool, sps.client.id) + implicit = true + } + if err := sps.client.validSession(sess); err != nil { + if implicit { + closeImplicitSession(sess) + } + return nil, func() {}, err + } + if implicit { + return sess, func() { closeImplicitSession(sess) }, nil + } + return sess, func() {}, nil +} + +func (sps *StreamProcessors) readSelector() description.ServerSelector { + return &serverselector.Composite{ + Selectors: []description.ServerSelector{ + &serverselector.ReadPref{ReadPref: sps.client.readPreference}, + &serverselector.Latency{Latency: sps.client.localThreshold}, + }, + } +} + +func (sps *StreamProcessors) writeSelector() description.ServerSelector { + return &serverselector.Composite{ + Selectors: []description.ServerSelector{ + &serverselector.Write{}, + &serverselector.Latency{Latency: sps.client.localThreshold}, + }, + } +} diff --git a/mongo/streamprocessing_samples.go b/mongo/streamprocessing_samples.go new file mode 100644 index 0000000000..31557f9951 --- /dev/null +++ b/mongo/streamprocessing_samples.go @@ -0,0 +1,118 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "context" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/mongoutil" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" +) + +// GetStreamProcessorSamplesResult is the result of a +// GetStreamProcessorSamples call. +// +// CursorID is the cursor identifier to pass to the next call. A value of 0 +// means the cursor has been exhausted; callers MUST NOT issue another +// GetStreamProcessorSamples for that cursor. +type GetStreamProcessorSamplesResult struct { + CursorID int64 + Documents []bson.Raw +} + +// GetStreamProcessorSamples retrieves a batch of sampled documents from a +// running stream processor. It abstracts over the two-phase wire protocol: +// +// - If CursorID is absent or zero, GetStreamProcessorSamples sends +// startSampleStreamProcessor (using Limit if provided) and then +// immediately issues a getMoreSampleStreamProcessor on the returned +// cursorId so that the caller receives documents on the first call. +// - Otherwise it sends a single getMoreSampleStreamProcessor (using +// BatchSize if provided) with the supplied CursorID. +// +// Callers MUST stop iterating when the returned CursorID is 0. +func (sp *StreamProcessor) GetStreamProcessorSamples( + ctx context.Context, + opts ...options.Lister[options.GetStreamProcessorSamplesOptions], +) (*GetStreamProcessorSamplesResult, error) { + if ctx == nil { + ctx = context.Background() + } + + args, err := mongoutil.NewOptions[options.GetStreamProcessorSamplesOptions](opts...) + if err != nil { + return nil, err + } + + sess, release, err := sp.parent.acquireSession(ctx) + if err != nil { + return nil, err + } + defer release() + + cursorID := int64(0) + if args.CursorID != nil { + cursorID = *args.CursorID + } + + if cursorID == 0 { + startOp := operation.NewStartSampleStreamProcessor(sp.name) + if args.Limit != nil { + startOp = startOp.Limit(*args.Limit) + } + startOp = startOp. + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.readSelector()). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator) + if err := startOp.Execute(ctx); err != nil { + return nil, err + } + cursorID = startOp.CursorID() + if cursorID == 0 { + // Server returned an exhausted cursor immediately. + return &GetStreamProcessorSamplesResult{}, nil + } + } + + getMoreOp := operation.NewGetMoreSampleStreamProcessor(sp.name, cursorID) + if args.BatchSize != nil { + getMoreOp = getMoreOp.BatchSize(*args.BatchSize) + } + getMoreOp = getMoreOp. + Session(sess). + ClusterClock(sp.parent.client.clock). + CommandMonitor(sp.parent.client.monitor). + Crypt(sp.parent.client.cryptFLE). + Database(streamProcessingAdminDB). + Deployment(sp.parent.client.deployment). + ServerSelector(sp.parent.readSelector()). + ServerAPI(sp.parent.client.serverAPI). + Authenticator(sp.parent.client.authenticator) + + if err := getMoreOp.Execute(ctx); err != nil { + return nil, err + } + + batch := getMoreOp.ResultBatch() + docs := make([]bson.Raw, len(batch)) + for i, d := range batch { + // Copy so the caller can retain documents past the next Execute. + docs[i] = append(bson.Raw(nil), bson.Raw(d)...) + } + return &GetStreamProcessorSamplesResult{ + CursorID: getMoreOp.ResultCursorID(), + Documents: docs, + }, nil +} diff --git a/mongo/streamprocessing_test.go b/mongo/streamprocessing_test.go new file mode 100644 index 0000000000..9c5a995862 --- /dev/null +++ b/mongo/streamprocessing_test.go @@ -0,0 +1,69 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package mongo + +import ( + "crypto/tls" + "testing" + + "go.mongodb.org/mongo-driver/v2/internal/assert" + "go.mongodb.org/mongo-driver/v2/internal/require" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +func TestApplyStreamProcessingDefaults_EnablesTLS(t *testing.T) { + opts := options.Client() + applyStreamProcessingDefaults(opts) + require.NotNil(t, opts.TLSConfig) +} + +func TestApplyStreamProcessingDefaults_PreservesExistingTLS(t *testing.T) { + cfg := &tls.Config{ServerName: "example"} + opts := options.Client().SetTLSConfig(cfg) + applyStreamProcessingDefaults(opts) + require.NotNil(t, opts.TLSConfig) + assert.Equal(t, "example", opts.TLSConfig.ServerName) +} + +func TestApplyStreamProcessingDefaults_DefaultsAuthSourceToAdmin(t *testing.T) { + opts := options.Client().SetAuth(options.Credential{Username: "u", Password: "p"}) + applyStreamProcessingDefaults(opts) + require.NotNil(t, opts.Auth) + assert.Equal(t, "admin", opts.Auth.AuthSource) +} + +func TestApplyStreamProcessingDefaults_PreservesExplicitAuthSource(t *testing.T) { + opts := options.Client().SetAuth(options.Credential{Username: "u", Password: "p", AuthSource: "elsewhere"}) + applyStreamProcessingDefaults(opts) + require.NotNil(t, opts.Auth) + assert.Equal(t, "elsewhere", opts.Auth.AuthSource) +} + +func TestApplyStreamProcessingDefaults_NoAuthLeavesAuthNil(t *testing.T) { + opts := options.Client() + applyStreamProcessingDefaults(opts) + assert.Nil(t, opts.Auth) +} + +func TestIsStreamProcessingHost(t *testing.T) { + cases := []struct { + host string + want bool + }{ + {"atlas-stream-699c842ef433fe6001480b17-etif1.virginia-usa.a.query.mongodb.net", true}, + {"atlas-stream-x.us-west.a.query.mongodb.net", true}, + {"cluster0.mongodb.net", false}, + {"localhost", false}, + {"atlas-stream-not-the-right-suffix.mongodb.net", false}, + {"", false}, + } + for _, tc := range cases { + t.Run(tc.host, func(t *testing.T) { + assert.Equal(t, tc.want, IsStreamProcessingHost(tc.host)) + }) + } +} diff --git a/streamprocessing/doc.go b/streamprocessing/doc.go new file mode 100644 index 0000000000..3ba4c8428a --- /dev/null +++ b/streamprocessing/doc.go @@ -0,0 +1,37 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +// Package streamprocessing exposes a client for Atlas Stream Processing +// (ASP) workspaces. +// +// A workspace is a dedicated Atlas endpoint that hosts one or more stream +// processors. Workspace endpoints use the standard "mongodb://" URI scheme; +// the hostname follows the pattern +// +// mongodb://atlas-stream--..a.query.mongodb.net/ +// +// TLS is required and authentication uses the admin database by default. +// +// The package re-exports a small set of types from the parent mongo package +// so callers can write +// +// import ( +// "go.mongodb.org/mongo-driver/v2/streamprocessing" +// "go.mongodb.org/mongo-driver/v2/mongo/options" +// ) +// +// client, err := streamprocessing.Connect(options.Client().ApplyURI(uri)) +// if err != nil { ... } +// defer client.Disconnect(ctx) +// +// sps := client.StreamProcessors() +// if err := sps.Create(ctx, "agg-1", pipeline); err != nil { ... } +// sp := sps.Get("agg-1") +// if err := sp.Start(ctx, nil); err != nil { ... } +// +// The wire protocol and behavioral semantics follow the Atlas Stream +// Processing driver specification. +package streamprocessing diff --git a/streamprocessing/streamprocessing.go b/streamprocessing/streamprocessing.go new file mode 100644 index 0000000000..b6ba4bb8ae --- /dev/null +++ b/streamprocessing/streamprocessing.go @@ -0,0 +1,47 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package streamprocessing + +import ( + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +// Client is a handle for a connection to an Atlas Stream Processing +// workspace. It is a type alias for [mongo.StreamProcessingClient]. +type Client = mongo.StreamProcessingClient + +// StreamProcessors is a handle for managing stream processors in a +// workspace. It is a type alias for [mongo.StreamProcessors]. +type StreamProcessors = mongo.StreamProcessors + +// StreamProcessor is a handle for a single named stream processor. It is a +// type alias for [mongo.StreamProcessor]. +type StreamProcessor = mongo.StreamProcessor + +// Info describes a single stream processor as returned by getStreamProcessor. +// It is a type alias for [mongo.StreamProcessorInfo]. +type Info = mongo.StreamProcessorInfo + +// SamplesResult is the result of a GetStreamProcessorSamples call. It is a +// type alias for [mongo.GetStreamProcessorSamplesResult]. +type SamplesResult = mongo.GetStreamProcessorSamplesResult + +// Connect creates a new [Client] for an Atlas Stream Processing workspace. +// +// Connect enforces TLS and defaults authSource to "admin"; otherwise it +// behaves identically to [mongo.Connect]. +func Connect(opts ...*options.ClientOptions) (*Client, error) { + return mongo.ConnectStreamProcessing(opts...) +} + +// IsWorkspaceHost reports whether the given hostname looks like an Atlas +// Stream Processing workspace endpoint (matches the pattern +// "atlas-stream-*.a.query.mongodb.net"). Detection is advisory only. +func IsWorkspaceHost(host string) bool { + return mongo.IsStreamProcessingHost(host) +} diff --git a/x/mongo/driver/operation/create_stream_processor.go b/x/mongo/driver/operation/create_stream_processor.go new file mode 100644 index 0000000000..3ac53eada1 --- /dev/null +++ b/x/mongo/driver/operation/create_stream_processor.go @@ -0,0 +1,228 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// CreateStreamProcessor performs a createStreamProcessor operation against an +// Atlas Stream Processing workspace. +// +// Wire shape: +// +// { +// createStreamProcessor: , +// pipeline: , +// options: { +// dlq: , +// streamMetaFieldName: , +// tier: , +// failover: +// } +// } +// +// The options sub-document is only included if at least one option is set. +type CreateStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + + name string + pipeline bsoncore.Document + dlq bsoncore.Document + streamMetaFieldName *string + tier *string + failoverEnabled *bool +} + +// NewCreateStreamProcessor constructs a new CreateStreamProcessor for the +// given processor name and aggregation pipeline. +func NewCreateStreamProcessor(name string, pipeline bsoncore.Document) *CreateStreamProcessor { + return &CreateStreamProcessor{name: name, pipeline: pipeline} +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (csp *CreateStreamProcessor) Execute(ctx context.Context) error { + if csp.deployment == nil { + return errors.New("the CreateStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: csp.command, + Client: csp.session, + Clock: csp.clock, + CommandMonitor: csp.monitor, + Crypt: csp.crypt, + Database: csp.database, + Deployment: csp.deployment, + Selector: csp.selector, + ServerAPI: csp.serverAPI, + Name: driverutil.CreateStreamProcessorOp, + Authenticator: csp.authenticator, + }.Execute(ctx) +} + +func (csp *CreateStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "createStreamProcessor", csp.name) + if csp.pipeline != nil { + dst = bsoncore.AppendArrayElement(dst, "pipeline", csp.pipeline) + } + if csp.dlq != nil || csp.streamMetaFieldName != nil || csp.tier != nil || csp.failoverEnabled != nil { + var optsIdx int32 + optsIdx, dst = bsoncore.AppendDocumentElementStart(dst, "options") + if csp.dlq != nil { + dst = bsoncore.AppendDocumentElement(dst, "dlq", csp.dlq) + } + if csp.streamMetaFieldName != nil { + dst = bsoncore.AppendStringElement(dst, "streamMetaFieldName", *csp.streamMetaFieldName) + } + if csp.tier != nil { + dst = bsoncore.AppendStringElement(dst, "tier", *csp.tier) + } + if csp.failoverEnabled != nil { + dst = bsoncore.AppendBooleanElement(dst, "failover", *csp.failoverEnabled) + } + var err error + dst, err = bsoncore.AppendDocumentEnd(dst, optsIdx) + if err != nil { + return nil, err + } + } + return dst, nil +} + +// DLQ sets the dead-letter-queue configuration document. +func (csp *CreateStreamProcessor) DLQ(dlq bsoncore.Document) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.dlq = dlq + return csp +} + +// StreamMetaFieldName sets the field name to use for stream metadata. +func (csp *CreateStreamProcessor) StreamMetaFieldName(name string) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.streamMetaFieldName = &name + return csp +} + +// Tier sets the compute tier for the new processor. +func (csp *CreateStreamProcessor) Tier(tier string) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.tier = &tier + return csp +} + +// FailoverEnabled sets whether failover should be enabled. +func (csp *CreateStreamProcessor) FailoverEnabled(b bool) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.failoverEnabled = &b + return csp +} + +// Session sets the session for this operation. +func (csp *CreateStreamProcessor) Session(s *session.Client) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.session = s + return csp +} + +// ClusterClock sets the cluster clock for this operation. +func (csp *CreateStreamProcessor) ClusterClock(c *session.ClusterClock) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.clock = c + return csp +} + +// CommandMonitor sets the monitor to use for APM events. +func (csp *CreateStreamProcessor) CommandMonitor(m *event.CommandMonitor) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.monitor = m + return csp +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (csp *CreateStreamProcessor) Crypt(c driver.Crypt) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.crypt = c + return csp +} + +// Database sets the database to run this operation against. +func (csp *CreateStreamProcessor) Database(database string) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.database = database + return csp +} + +// Deployment sets the deployment to use for this operation. +func (csp *CreateStreamProcessor) Deployment(d driver.Deployment) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.deployment = d + return csp +} + +// ServerSelector sets the selector used to retrieve a server. +func (csp *CreateStreamProcessor) ServerSelector(s description.ServerSelector) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.selector = s + return csp +} + +// ServerAPI sets the server API version for this operation. +func (csp *CreateStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.serverAPI = s + return csp +} + +// Authenticator sets the authenticator to use for this operation. +func (csp *CreateStreamProcessor) Authenticator(a driver.Authenticator) *CreateStreamProcessor { + if csp == nil { + csp = new(CreateStreamProcessor) + } + csp.authenticator = a + return csp +} diff --git a/x/mongo/driver/operation/drop_stream_processor.go b/x/mongo/driver/operation/drop_stream_processor.go new file mode 100644 index 0000000000..8ddaccbcf2 --- /dev/null +++ b/x/mongo/driver/operation/drop_stream_processor.go @@ -0,0 +1,146 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// DropStreamProcessor performs a dropStreamProcessor operation against an +// Atlas Stream Processing workspace. +type DropStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + name string +} + +// NewDropStreamProcessor constructs and returns a new DropStreamProcessor. +func NewDropStreamProcessor(name string) *DropStreamProcessor { + return &DropStreamProcessor{name: name} +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (dsp *DropStreamProcessor) Execute(ctx context.Context) error { + if dsp.deployment == nil { + return errors.New("the DropStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: dsp.command, + Client: dsp.session, + Clock: dsp.clock, + CommandMonitor: dsp.monitor, + Crypt: dsp.crypt, + Database: dsp.database, + Deployment: dsp.deployment, + Selector: dsp.selector, + ServerAPI: dsp.serverAPI, + Name: driverutil.DropStreamProcessorOp, + Authenticator: dsp.authenticator, + }.Execute(ctx) +} + +func (dsp *DropStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "dropStreamProcessor", dsp.name) + return dst, nil +} + +// Session sets the session for this operation. +func (dsp *DropStreamProcessor) Session(s *session.Client) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.session = s + return dsp +} + +// ClusterClock sets the cluster clock for this operation. +func (dsp *DropStreamProcessor) ClusterClock(c *session.ClusterClock) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.clock = c + return dsp +} + +// CommandMonitor sets the monitor to use for APM events. +func (dsp *DropStreamProcessor) CommandMonitor(m *event.CommandMonitor) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.monitor = m + return dsp +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (dsp *DropStreamProcessor) Crypt(c driver.Crypt) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.crypt = c + return dsp +} + +// Database sets the database to run this operation against. +func (dsp *DropStreamProcessor) Database(database string) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.database = database + return dsp +} + +// Deployment sets the deployment to use for this operation. +func (dsp *DropStreamProcessor) Deployment(d driver.Deployment) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.deployment = d + return dsp +} + +// ServerSelector sets the selector used to retrieve a server. +func (dsp *DropStreamProcessor) ServerSelector(s description.ServerSelector) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.selector = s + return dsp +} + +// ServerAPI sets the server API version for this operation. +func (dsp *DropStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.serverAPI = s + return dsp +} + +// Authenticator sets the authenticator to use for this operation. +func (dsp *DropStreamProcessor) Authenticator(a driver.Authenticator) *DropStreamProcessor { + if dsp == nil { + dsp = new(DropStreamProcessor) + } + dsp.authenticator = a + return dsp +} diff --git a/x/mongo/driver/operation/get_more_sample_stream_processor.go b/x/mongo/driver/operation/get_more_sample_stream_processor.go new file mode 100644 index 0000000000..d13eb8495f --- /dev/null +++ b/x/mongo/driver/operation/get_more_sample_stream_processor.go @@ -0,0 +1,212 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + "fmt" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// GetMoreSampleStreamProcessor performs a getMoreSampleStreamProcessor +// operation. The response carries a cursorId (0 means exhausted) and a +// nextBatch of documents. +type GetMoreSampleStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + + name string + cursorID int64 + batchSize *int32 + + resultCursorID int64 + resultBatch []bsoncore.Document +} + +// NewGetMoreSampleStreamProcessor constructs a new GetMoreSampleStreamProcessor. +func NewGetMoreSampleStreamProcessor(name string, cursorID int64) *GetMoreSampleStreamProcessor { + return &GetMoreSampleStreamProcessor{name: name, cursorID: cursorID} +} + +// ResultCursorID returns the cursor ID from the most recent response. 0 means +// the cursor is exhausted; callers MUST NOT issue another getMore. +func (op *GetMoreSampleStreamProcessor) ResultCursorID() int64 { return op.resultCursorID } + +// ResultBatch returns the documents from the most recent response. The +// returned slice references memory owned by the operation; callers that need +// to retain documents past the next Execute should copy them. +func (op *GetMoreSampleStreamProcessor) ResultBatch() []bsoncore.Document { return op.resultBatch } + +func (op *GetMoreSampleStreamProcessor) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { + idVal, err := resp.LookupErr("cursorId") + if err != nil { + return fmt.Errorf("getMoreSampleStreamProcessor response is missing cursorId: %w", err) + } + id, ok := idVal.AsInt64OK() + if !ok { + return errors.New("getMoreSampleStreamProcessor response cursorId is not a number") + } + op.resultCursorID = id + + op.resultBatch = op.resultBatch[:0] + batchVal, err := resp.LookupErr("nextBatch") + if err != nil { + // nextBatch may be absent for an exhausted cursor; treat as empty. + return nil + } + arr, ok := batchVal.ArrayOK() + if !ok { + return errors.New("getMoreSampleStreamProcessor response nextBatch is not an array") + } + vals, err := bsoncore.Document(arr).Elements() + if err != nil { + return fmt.Errorf("getMoreSampleStreamProcessor response nextBatch is malformed: %w", err) + } + for _, elem := range vals { + doc, ok := elem.Value().DocumentOK() + if !ok { + return errors.New("getMoreSampleStreamProcessor response nextBatch element is not a document") + } + op.resultBatch = append(op.resultBatch, doc) + } + return nil +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (op *GetMoreSampleStreamProcessor) Execute(ctx context.Context) error { + if op.deployment == nil { + return errors.New("the GetMoreSampleStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: op.command, + ProcessResponseFn: op.processResponse, + Client: op.session, + Clock: op.clock, + CommandMonitor: op.monitor, + Crypt: op.crypt, + Database: op.database, + Deployment: op.deployment, + Selector: op.selector, + ServerAPI: op.serverAPI, + Name: driverutil.GetMoreSampleStreamProcessorOp, + Authenticator: op.authenticator, + }.Execute(ctx) +} + +func (op *GetMoreSampleStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "getMoreSampleStreamProcessor", op.name) + dst = bsoncore.AppendInt64Element(dst, "cursorId", op.cursorID) + if op.batchSize != nil { + dst = bsoncore.AppendInt32Element(dst, "batchSize", *op.batchSize) + } + return dst, nil +} + +// BatchSize sets the desired batch size for this fetch. +func (op *GetMoreSampleStreamProcessor) BatchSize(n int32) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.batchSize = &n + return op +} + +// Session sets the session for this operation. +func (op *GetMoreSampleStreamProcessor) Session(s *session.Client) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.session = s + return op +} + +// ClusterClock sets the cluster clock for this operation. +func (op *GetMoreSampleStreamProcessor) ClusterClock(c *session.ClusterClock) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.clock = c + return op +} + +// CommandMonitor sets the monitor to use for APM events. +func (op *GetMoreSampleStreamProcessor) CommandMonitor(m *event.CommandMonitor) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.monitor = m + return op +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (op *GetMoreSampleStreamProcessor) Crypt(c driver.Crypt) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.crypt = c + return op +} + +// Database sets the database to run this operation against. +func (op *GetMoreSampleStreamProcessor) Database(database string) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.database = database + return op +} + +// Deployment sets the deployment to use for this operation. +func (op *GetMoreSampleStreamProcessor) Deployment(d driver.Deployment) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.deployment = d + return op +} + +// ServerSelector sets the selector used to retrieve a server. +func (op *GetMoreSampleStreamProcessor) ServerSelector(s description.ServerSelector) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.selector = s + return op +} + +// ServerAPI sets the server API version for this operation. +func (op *GetMoreSampleStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.serverAPI = s + return op +} + +// Authenticator sets the authenticator to use for this operation. +func (op *GetMoreSampleStreamProcessor) Authenticator(a driver.Authenticator) *GetMoreSampleStreamProcessor { + if op == nil { + op = new(GetMoreSampleStreamProcessor) + } + op.authenticator = a + return op +} diff --git a/x/mongo/driver/operation/get_stream_processor.go b/x/mongo/driver/operation/get_stream_processor.go new file mode 100644 index 0000000000..7269cddc61 --- /dev/null +++ b/x/mongo/driver/operation/get_stream_processor.go @@ -0,0 +1,183 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// GetStreamProcessor performs a getStreamProcessor operation. The full +// response document is captured raw and returned via Result so the caller can +// decode whatever fields it needs (tolerating unknown server-internal fields). +type GetStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + readPreference *readpref.ReadPref + retry *driver.RetryMode + + name string + result bsoncore.Document +} + +// NewGetStreamProcessor constructs a new GetStreamProcessor. +func NewGetStreamProcessor(name string) *GetStreamProcessor { + return &GetStreamProcessor{name: name} +} + +// Result returns the raw server response from the most recent successful +// Execute call. +func (gsp *GetStreamProcessor) Result() bsoncore.Document { return gsp.result } + +func (gsp *GetStreamProcessor) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { + gsp.result = resp + return nil +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (gsp *GetStreamProcessor) Execute(ctx context.Context) error { + if gsp.deployment == nil { + return errors.New("the GetStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: gsp.command, + ProcessResponseFn: gsp.processResponse, + RetryMode: gsp.retry, + Type: driver.Read, + Client: gsp.session, + Clock: gsp.clock, + CommandMonitor: gsp.monitor, + Crypt: gsp.crypt, + Database: gsp.database, + Deployment: gsp.deployment, + ReadPreference: gsp.readPreference, + Selector: gsp.selector, + ServerAPI: gsp.serverAPI, + Name: driverutil.GetStreamProcessorOp, + Authenticator: gsp.authenticator, + }.Execute(ctx) +} + +func (gsp *GetStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "getStreamProcessor", gsp.name) + return dst, nil +} + +// Retry configures the retry mode for this operation. +func (gsp *GetStreamProcessor) Retry(retry driver.RetryMode) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.retry = &retry + return gsp +} + +// ReadPreference sets the read preference for this operation. +func (gsp *GetStreamProcessor) ReadPreference(rp *readpref.ReadPref) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.readPreference = rp + return gsp +} + +// Session sets the session for this operation. +func (gsp *GetStreamProcessor) Session(s *session.Client) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.session = s + return gsp +} + +// ClusterClock sets the cluster clock for this operation. +func (gsp *GetStreamProcessor) ClusterClock(c *session.ClusterClock) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.clock = c + return gsp +} + +// CommandMonitor sets the monitor to use for APM events. +func (gsp *GetStreamProcessor) CommandMonitor(m *event.CommandMonitor) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.monitor = m + return gsp +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (gsp *GetStreamProcessor) Crypt(c driver.Crypt) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.crypt = c + return gsp +} + +// Database sets the database to run this operation against. +func (gsp *GetStreamProcessor) Database(database string) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.database = database + return gsp +} + +// Deployment sets the deployment to use for this operation. +func (gsp *GetStreamProcessor) Deployment(d driver.Deployment) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.deployment = d + return gsp +} + +// ServerSelector sets the selector used to retrieve a server. +func (gsp *GetStreamProcessor) ServerSelector(s description.ServerSelector) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.selector = s + return gsp +} + +// ServerAPI sets the server API version for this operation. +func (gsp *GetStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.serverAPI = s + return gsp +} + +// Authenticator sets the authenticator to use for this operation. +func (gsp *GetStreamProcessor) Authenticator(a driver.Authenticator) *GetStreamProcessor { + if gsp == nil { + gsp = new(GetStreamProcessor) + } + gsp.authenticator = a + return gsp +} diff --git a/x/mongo/driver/operation/get_stream_processor_stats.go b/x/mongo/driver/operation/get_stream_processor_stats.go new file mode 100644 index 0000000000..40ea49c753 --- /dev/null +++ b/x/mongo/driver/operation/get_stream_processor_stats.go @@ -0,0 +1,201 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/mongo/readpref" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// GetStreamProcessorStats performs a getStreamProcessorStats operation. The +// full response document is captured raw. +type GetStreamProcessorStats struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + readPreference *readpref.ReadPref + retry *driver.RetryMode + + name string + verbose *bool + result bsoncore.Document +} + +// NewGetStreamProcessorStats constructs a new GetStreamProcessorStats. +func NewGetStreamProcessorStats(name string) *GetStreamProcessorStats { + return &GetStreamProcessorStats{name: name} +} + +// Result returns the raw server response. +func (gsps *GetStreamProcessorStats) Result() bsoncore.Document { return gsps.result } + +func (gsps *GetStreamProcessorStats) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { + gsps.result = resp + return nil +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (gsps *GetStreamProcessorStats) Execute(ctx context.Context) error { + if gsps.deployment == nil { + return errors.New("the GetStreamProcessorStats operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: gsps.command, + ProcessResponseFn: gsps.processResponse, + RetryMode: gsps.retry, + Type: driver.Read, + Client: gsps.session, + Clock: gsps.clock, + CommandMonitor: gsps.monitor, + Crypt: gsps.crypt, + Database: gsps.database, + Deployment: gsps.deployment, + ReadPreference: gsps.readPreference, + Selector: gsps.selector, + ServerAPI: gsps.serverAPI, + Name: driverutil.GetStreamProcessorStatsOp, + Authenticator: gsps.authenticator, + }.Execute(ctx) +} + +func (gsps *GetStreamProcessorStats) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "getStreamProcessorStats", gsps.name) + if gsps.verbose != nil { + var optsIdx int32 + optsIdx, dst = bsoncore.AppendDocumentElementStart(dst, "options") + dst = bsoncore.AppendBooleanElement(dst, "verbose", *gsps.verbose) + var err error + dst, err = bsoncore.AppendDocumentEnd(dst, optsIdx) + if err != nil { + return nil, err + } + } + return dst, nil +} + +// Verbose sets the options.verbose flag. +func (gsps *GetStreamProcessorStats) Verbose(b bool) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.verbose = &b + return gsps +} + +// Retry configures the retry mode for this operation. +func (gsps *GetStreamProcessorStats) Retry(retry driver.RetryMode) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.retry = &retry + return gsps +} + +// ReadPreference sets the read preference for this operation. +func (gsps *GetStreamProcessorStats) ReadPreference(rp *readpref.ReadPref) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.readPreference = rp + return gsps +} + +// Session sets the session for this operation. +func (gsps *GetStreamProcessorStats) Session(s *session.Client) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.session = s + return gsps +} + +// ClusterClock sets the cluster clock for this operation. +func (gsps *GetStreamProcessorStats) ClusterClock(c *session.ClusterClock) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.clock = c + return gsps +} + +// CommandMonitor sets the monitor to use for APM events. +func (gsps *GetStreamProcessorStats) CommandMonitor(m *event.CommandMonitor) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.monitor = m + return gsps +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (gsps *GetStreamProcessorStats) Crypt(c driver.Crypt) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.crypt = c + return gsps +} + +// Database sets the database to run this operation against. +func (gsps *GetStreamProcessorStats) Database(database string) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.database = database + return gsps +} + +// Deployment sets the deployment to use for this operation. +func (gsps *GetStreamProcessorStats) Deployment(d driver.Deployment) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.deployment = d + return gsps +} + +// ServerSelector sets the selector used to retrieve a server. +func (gsps *GetStreamProcessorStats) ServerSelector(s description.ServerSelector) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.selector = s + return gsps +} + +// ServerAPI sets the server API version for this operation. +func (gsps *GetStreamProcessorStats) ServerAPI(s *driver.ServerAPIOptions) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.serverAPI = s + return gsps +} + +// Authenticator sets the authenticator to use for this operation. +func (gsps *GetStreamProcessorStats) Authenticator(a driver.Authenticator) *GetStreamProcessorStats { + if gsps == nil { + gsps = new(GetStreamProcessorStats) + } + gsps.authenticator = a + return gsps +} diff --git a/x/mongo/driver/operation/start_sample_stream_processor.go b/x/mongo/driver/operation/start_sample_stream_processor.go new file mode 100644 index 0000000000..c1fa3135e2 --- /dev/null +++ b/x/mongo/driver/operation/start_sample_stream_processor.go @@ -0,0 +1,181 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + "fmt" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// StartSampleStreamProcessor performs a startSampleStreamProcessor operation. +// On success the server returns a cursorId; documents are retrieved by a +// subsequent GetMoreSampleStreamProcessor call. +type StartSampleStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + + name string + limit *int32 + cursorID int64 +} + +// NewStartSampleStreamProcessor constructs a new StartSampleStreamProcessor. +func NewStartSampleStreamProcessor(name string) *StartSampleStreamProcessor { + return &StartSampleStreamProcessor{name: name} +} + +// CursorID returns the cursor ID returned by the server. Zero before Execute +// succeeds. +func (op *StartSampleStreamProcessor) CursorID() int64 { return op.cursorID } + +func (op *StartSampleStreamProcessor) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error { + val, err := resp.LookupErr("cursorId") + if err != nil { + return fmt.Errorf("startSampleStreamProcessor response is missing cursorId: %w", err) + } + id, ok := val.AsInt64OK() + if !ok { + return errors.New("startSampleStreamProcessor response cursorId is not a number") + } + op.cursorID = id + return nil +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (op *StartSampleStreamProcessor) Execute(ctx context.Context) error { + if op.deployment == nil { + return errors.New("the StartSampleStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: op.command, + ProcessResponseFn: op.processResponse, + Client: op.session, + Clock: op.clock, + CommandMonitor: op.monitor, + Crypt: op.crypt, + Database: op.database, + Deployment: op.deployment, + Selector: op.selector, + ServerAPI: op.serverAPI, + Name: driverutil.StartSampleStreamProcessorOp, + Authenticator: op.authenticator, + }.Execute(ctx) +} + +func (op *StartSampleStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "startSampleStreamProcessor", op.name) + if op.limit != nil { + dst = bsoncore.AppendInt32Element(dst, "limit", *op.limit) + } + return dst, nil +} + +// Limit sets the maximum number of documents to sample. +func (op *StartSampleStreamProcessor) Limit(l int32) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.limit = &l + return op +} + +// Session sets the session for this operation. +func (op *StartSampleStreamProcessor) Session(s *session.Client) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.session = s + return op +} + +// ClusterClock sets the cluster clock for this operation. +func (op *StartSampleStreamProcessor) ClusterClock(c *session.ClusterClock) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.clock = c + return op +} + +// CommandMonitor sets the monitor to use for APM events. +func (op *StartSampleStreamProcessor) CommandMonitor(m *event.CommandMonitor) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.monitor = m + return op +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (op *StartSampleStreamProcessor) Crypt(c driver.Crypt) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.crypt = c + return op +} + +// Database sets the database to run this operation against. +func (op *StartSampleStreamProcessor) Database(database string) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.database = database + return op +} + +// Deployment sets the deployment to use for this operation. +func (op *StartSampleStreamProcessor) Deployment(d driver.Deployment) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.deployment = d + return op +} + +// ServerSelector sets the selector used to retrieve a server. +func (op *StartSampleStreamProcessor) ServerSelector(s description.ServerSelector) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.selector = s + return op +} + +// ServerAPI sets the server API version for this operation. +func (op *StartSampleStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.serverAPI = s + return op +} + +// Authenticator sets the authenticator to use for this operation. +func (op *StartSampleStreamProcessor) Authenticator(a driver.Authenticator) *StartSampleStreamProcessor { + if op == nil { + op = new(StartSampleStreamProcessor) + } + op.authenticator = a + return op +} diff --git a/x/mongo/driver/operation/start_stream_processor.go b/x/mongo/driver/operation/start_stream_processor.go new file mode 100644 index 0000000000..ce28e4c917 --- /dev/null +++ b/x/mongo/driver/operation/start_stream_processor.go @@ -0,0 +1,298 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// StartStreamProcessor performs a startStreamProcessor operation against an +// Atlas Stream Processing workspace. +// +// Wire shape: +// +// { +// startStreamProcessor: , +// workers: , +// options: { +// clearCheckpoints: , +// startAtOperationTime: , +// tier: , +// enableAutoScaling: +// }, +// failover: { +// region: , +// mode: , +// dryRun: +// } +// } +// +// The startAfter option is reserved by the spec for future use and is +// intentionally not serialized to the wire. +type StartStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + + name string + workers *int32 + clearCheckpoints *bool + startAtOperation *startAtOperationTime + tier *string + enableAutoScaling *bool + + failoverRegion *string + failoverMode *string + failoverDryRun *bool +} + +type startAtOperationTime struct { + t uint32 + i uint32 +} + +// NewStartStreamProcessor constructs a new StartStreamProcessor. +func NewStartStreamProcessor(name string) *StartStreamProcessor { + return &StartStreamProcessor{name: name} +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (ssp *StartStreamProcessor) Execute(ctx context.Context) error { + if ssp.deployment == nil { + return errors.New("the StartStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: ssp.command, + Client: ssp.session, + Clock: ssp.clock, + CommandMonitor: ssp.monitor, + Crypt: ssp.crypt, + Database: ssp.database, + Deployment: ssp.deployment, + Selector: ssp.selector, + ServerAPI: ssp.serverAPI, + Name: driverutil.StartStreamProcessorOp, + Authenticator: ssp.authenticator, + }.Execute(ctx) +} + +func (ssp *StartStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "startStreamProcessor", ssp.name) + if ssp.workers != nil { + dst = bsoncore.AppendInt32Element(dst, "workers", *ssp.workers) + } + if ssp.clearCheckpoints != nil || ssp.startAtOperation != nil || ssp.tier != nil || ssp.enableAutoScaling != nil { + var optsIdx int32 + optsIdx, dst = bsoncore.AppendDocumentElementStart(dst, "options") + if ssp.clearCheckpoints != nil { + dst = bsoncore.AppendBooleanElement(dst, "clearCheckpoints", *ssp.clearCheckpoints) + } + if ssp.startAtOperation != nil { + dst = bsoncore.AppendTimestampElement(dst, "startAtOperationTime", ssp.startAtOperation.t, ssp.startAtOperation.i) + } + if ssp.tier != nil { + dst = bsoncore.AppendStringElement(dst, "tier", *ssp.tier) + } + if ssp.enableAutoScaling != nil { + dst = bsoncore.AppendBooleanElement(dst, "enableAutoScaling", *ssp.enableAutoScaling) + } + var err error + dst, err = bsoncore.AppendDocumentEnd(dst, optsIdx) + if err != nil { + return nil, err + } + } + if ssp.failoverRegion != nil || ssp.failoverMode != nil || ssp.failoverDryRun != nil { + if ssp.failoverRegion == nil { + return nil, errors.New("failover requires a target region") + } + var failIdx int32 + failIdx, dst = bsoncore.AppendDocumentElementStart(dst, "failover") + dst = bsoncore.AppendStringElement(dst, "region", *ssp.failoverRegion) + if ssp.failoverMode != nil { + dst = bsoncore.AppendStringElement(dst, "mode", *ssp.failoverMode) + } + if ssp.failoverDryRun != nil { + dst = bsoncore.AppendBooleanElement(dst, "dryRun", *ssp.failoverDryRun) + } + var err error + dst, err = bsoncore.AppendDocumentEnd(dst, failIdx) + if err != nil { + return nil, err + } + } + return dst, nil +} + +// Workers sets the workers field on the command. +func (ssp *StartStreamProcessor) Workers(w int32) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.workers = &w + return ssp +} + +// ClearCheckpoints sets the options.clearCheckpoints flag. +func (ssp *StartStreamProcessor) ClearCheckpoints(b bool) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.clearCheckpoints = &b + return ssp +} + +// StartAtOperationTime sets the options.startAtOperationTime BSON timestamp. +func (ssp *StartStreamProcessor) StartAtOperationTime(t, i uint32) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.startAtOperation = &startAtOperationTime{t: t, i: i} + return ssp +} + +// Tier sets the options.tier value. +func (ssp *StartStreamProcessor) Tier(tier string) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.tier = &tier + return ssp +} + +// EnableAutoScaling sets the options.enableAutoScaling flag. +func (ssp *StartStreamProcessor) EnableAutoScaling(b bool) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.enableAutoScaling = &b + return ssp +} + +// FailoverRegion sets the failover.region value. Required when any failover +// field is set. +func (ssp *StartStreamProcessor) FailoverRegion(region string) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.failoverRegion = ®ion + return ssp +} + +// FailoverMode sets the failover.mode value (e.g. "GRACEFUL", "FORCED"). +func (ssp *StartStreamProcessor) FailoverMode(mode string) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.failoverMode = &mode + return ssp +} + +// FailoverDryRun sets the failover.dryRun flag. +func (ssp *StartStreamProcessor) FailoverDryRun(b bool) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.failoverDryRun = &b + return ssp +} + +// Session sets the session for this operation. +func (ssp *StartStreamProcessor) Session(s *session.Client) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.session = s + return ssp +} + +// ClusterClock sets the cluster clock for this operation. +func (ssp *StartStreamProcessor) ClusterClock(c *session.ClusterClock) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.clock = c + return ssp +} + +// CommandMonitor sets the monitor to use for APM events. +func (ssp *StartStreamProcessor) CommandMonitor(m *event.CommandMonitor) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.monitor = m + return ssp +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (ssp *StartStreamProcessor) Crypt(c driver.Crypt) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.crypt = c + return ssp +} + +// Database sets the database to run this operation against. +func (ssp *StartStreamProcessor) Database(database string) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.database = database + return ssp +} + +// Deployment sets the deployment to use for this operation. +func (ssp *StartStreamProcessor) Deployment(d driver.Deployment) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.deployment = d + return ssp +} + +// ServerSelector sets the selector used to retrieve a server. +func (ssp *StartStreamProcessor) ServerSelector(s description.ServerSelector) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.selector = s + return ssp +} + +// ServerAPI sets the server API version for this operation. +func (ssp *StartStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.serverAPI = s + return ssp +} + +// Authenticator sets the authenticator to use for this operation. +func (ssp *StartStreamProcessor) Authenticator(a driver.Authenticator) *StartStreamProcessor { + if ssp == nil { + ssp = new(StartStreamProcessor) + } + ssp.authenticator = a + return ssp +} diff --git a/x/mongo/driver/operation/stop_stream_processor.go b/x/mongo/driver/operation/stop_stream_processor.go new file mode 100644 index 0000000000..72b5111c88 --- /dev/null +++ b/x/mongo/driver/operation/stop_stream_processor.go @@ -0,0 +1,146 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "context" + "errors" + + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/driverutil" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/session" +) + +// StopStreamProcessor performs a stopStreamProcessor operation against an +// Atlas Stream Processing workspace. +type StopStreamProcessor struct { + authenticator driver.Authenticator + session *session.Client + clock *session.ClusterClock + monitor *event.CommandMonitor + crypt driver.Crypt + database string + deployment driver.Deployment + selector description.ServerSelector + serverAPI *driver.ServerAPIOptions + name string +} + +// NewStopStreamProcessor constructs and returns a new StopStreamProcessor. +func NewStopStreamProcessor(name string) *StopStreamProcessor { + return &StopStreamProcessor{name: name} +} + +// Execute runs this operation and returns an error if it does not execute successfully. +func (ssp *StopStreamProcessor) Execute(ctx context.Context) error { + if ssp.deployment == nil { + return errors.New("the StopStreamProcessor operation must have a Deployment set before Execute can be called") + } + + return driver.Operation{ + CommandFn: ssp.command, + Client: ssp.session, + Clock: ssp.clock, + CommandMonitor: ssp.monitor, + Crypt: ssp.crypt, + Database: ssp.database, + Deployment: ssp.deployment, + Selector: ssp.selector, + ServerAPI: ssp.serverAPI, + Name: driverutil.StopStreamProcessorOp, + Authenticator: ssp.authenticator, + }.Execute(ctx) +} + +func (ssp *StopStreamProcessor) command(dst []byte, _ description.SelectedServer) ([]byte, error) { + dst = bsoncore.AppendStringElement(dst, "stopStreamProcessor", ssp.name) + return dst, nil +} + +// Session sets the session for this operation. +func (ssp *StopStreamProcessor) Session(s *session.Client) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.session = s + return ssp +} + +// ClusterClock sets the cluster clock for this operation. +func (ssp *StopStreamProcessor) ClusterClock(c *session.ClusterClock) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.clock = c + return ssp +} + +// CommandMonitor sets the monitor to use for APM events. +func (ssp *StopStreamProcessor) CommandMonitor(m *event.CommandMonitor) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.monitor = m + return ssp +} + +// Crypt sets the Crypt object to use for automatic encryption and decryption. +func (ssp *StopStreamProcessor) Crypt(c driver.Crypt) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.crypt = c + return ssp +} + +// Database sets the database to run this operation against. +func (ssp *StopStreamProcessor) Database(database string) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.database = database + return ssp +} + +// Deployment sets the deployment to use for this operation. +func (ssp *StopStreamProcessor) Deployment(d driver.Deployment) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.deployment = d + return ssp +} + +// ServerSelector sets the selector used to retrieve a server. +func (ssp *StopStreamProcessor) ServerSelector(s description.ServerSelector) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.selector = s + return ssp +} + +// ServerAPI sets the server API version for this operation. +func (ssp *StopStreamProcessor) ServerAPI(s *driver.ServerAPIOptions) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.serverAPI = s + return ssp +} + +// Authenticator sets the authenticator to use for this operation. +func (ssp *StopStreamProcessor) Authenticator(a driver.Authenticator) *StopStreamProcessor { + if ssp == nil { + ssp = new(StopStreamProcessor) + } + ssp.authenticator = a + return ssp +} diff --git a/x/mongo/driver/operation/stream_processor_test.go b/x/mongo/driver/operation/stream_processor_test.go new file mode 100644 index 0000000000..dc834c7408 --- /dev/null +++ b/x/mongo/driver/operation/stream_processor_test.go @@ -0,0 +1,321 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +package operation + +import ( + "testing" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/internal/assert" + "go.mongodb.org/mongo-driver/v2/internal/require" + "go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver" + "go.mongodb.org/mongo-driver/v2/x/mongo/driver/description" +) + +// buildSPCommand wraps op.command with the BSON document framing so the +// returned byte slice is a valid root-level BSON document. +func buildSPCommand(t *testing.T, build func(dst []byte) ([]byte, error)) bsoncore.Document { + t.Helper() + idx, dst := bsoncore.AppendDocumentStart(nil) + dst, err := build(dst) + require.NoError(t, err) + dst, err = bsoncore.AppendDocumentEnd(dst, idx) + require.NoError(t, err) + doc, _, ok := bsoncore.ReadDocument(dst) + require.True(t, ok) + return doc +} + +func mustBSON(t *testing.T, d bson.D) bsoncore.Document { + t.Helper() + raw, err := bson.Marshal(d) + require.NoError(t, err) + return raw +} + +func TestCreateStreamProcessor_Command(t *testing.T) { + t.Run("minimal", func(t *testing.T) { + stage := mustBSON(t, bson.D{{Key: "$source", Value: bson.D{{Key: "connectionName", Value: "kafka"}}}}) + arrIdx, arr := bsoncore.AppendArrayStart(nil) + arr = bsoncore.AppendDocumentElement(arr, "0", stage) + arr, err := bsoncore.AppendArrayEnd(arr, arrIdx) + require.NoError(t, err) + op := NewCreateStreamProcessor("proc1", bsoncore.Document(arr)) + + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + + // Verify the command name comes first and pipeline is included. + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.GreaterOrEqual(t, len(gotD), 2) + assert.Equal(t, "createStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) + assert.Equal(t, "pipeline", gotD[1].Key) + }) + + t.Run("with options", func(t *testing.T) { + dlq := mustBSON(t, bson.D{{Key: "connectionName", Value: "lostMessages"}}) + op := NewCreateStreamProcessor("proc2", nil). + DLQ(dlq). + StreamMetaFieldName("_stream"). + Tier("SP10"). + FailoverEnabled(true) + + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + + // First key is the command name. + assert.Equal(t, "createStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc2", gotD[0].Value) + + // Find the options sub-doc. + var opts bson.D + for _, e := range gotD { + if e.Key == "options" { + opts = e.Value.(bson.D) + } + } + require.NotNil(t, opts) + seen := map[string]any{} + for _, e := range opts { + seen[e.Key] = e.Value + } + assert.Equal(t, "_stream", seen["streamMetaFieldName"]) + assert.Equal(t, "SP10", seen["tier"]) + assert.Equal(t, true, seen["failover"]) + _, hasDLQ := seen["dlq"] + assert.True(t, hasDLQ, "expected dlq in options") + }) +} + +func TestStartStreamProcessor_Command(t *testing.T) { + t.Run("minimal", func(t *testing.T) { + op := NewStartStreamProcessor("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "startStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) + }) + + t.Run("with failover and options", func(t *testing.T) { + op := NewStartStreamProcessor("proc1"). + Workers(4). + ClearCheckpoints(true). + Tier("SP30"). + EnableAutoScaling(true). + FailoverRegion("us-east-1"). + FailoverMode("GRACEFUL"). + FailoverDryRun(false) + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + + keys := make(map[string]any) + for _, e := range gotD { + keys[e.Key] = e.Value + } + assert.Equal(t, "proc1", keys["startStreamProcessor"]) + assert.Equal(t, int32(4), keys["workers"]) + + opts, ok := keys["options"].(bson.D) + require.True(t, ok) + optsMap := map[string]any{} + for _, e := range opts { + optsMap[e.Key] = e.Value + } + assert.Equal(t, true, optsMap["clearCheckpoints"]) + assert.Equal(t, "SP30", optsMap["tier"]) + assert.Equal(t, true, optsMap["enableAutoScaling"]) + + fail, ok := keys["failover"].(bson.D) + require.True(t, ok) + failMap := map[string]any{} + for _, e := range fail { + failMap[e.Key] = e.Value + } + assert.Equal(t, "us-east-1", failMap["region"]) + assert.Equal(t, "GRACEFUL", failMap["mode"]) + assert.Equal(t, false, failMap["dryRun"]) + }) + + t.Run("failover without region errors", func(t *testing.T) { + op := NewStartStreamProcessor("proc1").FailoverMode("FORCED") + _, err := op.command(nil, description.SelectedServer{}) + require.Error(t, err) + }) + + t.Run("startAtOperationTime is serialized as timestamp", func(t *testing.T) { + op := NewStartStreamProcessor("proc1").StartAtOperationTime(42, 7) + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + var opts bson.D + for _, e := range gotD { + if e.Key == "options" { + opts = e.Value.(bson.D) + } + } + require.NotNil(t, opts) + var ts bson.Timestamp + for _, e := range opts { + if e.Key == "startAtOperationTime" { + ts = e.Value.(bson.Timestamp) + } + } + assert.Equal(t, uint32(42), ts.T) + assert.Equal(t, uint32(7), ts.I) + }) +} + +func TestStopStreamProcessor_Command(t *testing.T) { + op := NewStopStreamProcessor("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "stopStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) +} + +func TestDropStreamProcessor_Command(t *testing.T) { + op := NewDropStreamProcessor("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "dropStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) +} + +func TestGetStreamProcessor_Command(t *testing.T) { + op := NewGetStreamProcessor("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "getStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) +} + +func TestGetStreamProcessorStats_Command(t *testing.T) { + t.Run("minimal", func(t *testing.T) { + op := NewGetStreamProcessorStats("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "getStreamProcessorStats", gotD[0].Key) + }) + + t.Run("verbose", func(t *testing.T) { + op := NewGetStreamProcessorStats("proc1").Verbose(true) + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + var opts bson.D + for _, e := range gotD { + if e.Key == "options" { + opts = e.Value.(bson.D) + } + } + require.NotNil(t, opts) + assert.Equal(t, "verbose", opts[0].Key) + assert.Equal(t, true, opts[0].Value) + }) +} + +func TestStartSampleStreamProcessor_Command(t *testing.T) { + t.Run("no limit", func(t *testing.T) { + op := NewStartSampleStreamProcessor("proc1") + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 1) + assert.Equal(t, "startSampleStreamProcessor", gotD[0].Key) + }) + + t.Run("with limit", func(t *testing.T) { + op := NewStartSampleStreamProcessor("proc1").Limit(100) + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 2) + assert.Equal(t, "startSampleStreamProcessor", gotD[0].Key) + assert.Equal(t, "limit", gotD[1].Key) + assert.Equal(t, int32(100), gotD[1].Value) + }) +} + +func TestGetMoreSampleStreamProcessor_Command(t *testing.T) { + op := NewGetMoreSampleStreamProcessor("proc1", 42).BatchSize(50) + got := buildSPCommand(t, func(dst []byte) ([]byte, error) { + return op.command(dst, description.SelectedServer{}) + }) + gotD := bson.D{} + require.NoError(t, bson.Unmarshal(got, &gotD)) + require.Len(t, gotD, 3) + assert.Equal(t, "getMoreSampleStreamProcessor", gotD[0].Key) + assert.Equal(t, "proc1", gotD[0].Value) + assert.Equal(t, "cursorId", gotD[1].Key) + assert.Equal(t, int64(42), gotD[1].Value) + assert.Equal(t, "batchSize", gotD[2].Key) + assert.Equal(t, int32(50), gotD[2].Value) +} + +func TestStartSampleStreamProcessor_ParseResponse(t *testing.T) { + resp := mustBSON(t, bson.D{{Key: "ok", Value: 1.0}, {Key: "cursorId", Value: int64(99)}}) + op := NewStartSampleStreamProcessor("proc1") + require.NoError(t, op.processResponse(nil, resp, driver.ResponseInfo{})) + assert.Equal(t, int64(99), op.CursorID()) +} + +func TestGetMoreSampleStreamProcessor_ParseResponse(t *testing.T) { + doc1 := mustBSON(t, bson.D{{Key: "x", Value: 1}}) + doc2 := mustBSON(t, bson.D{{Key: "x", Value: 2}}) + resp := mustBSON(t, bson.D{ + {Key: "ok", Value: 1.0}, + {Key: "cursorId", Value: int64(0)}, // exhausted + {Key: "nextBatch", Value: bson.A{ + bson.Raw(doc1), + bson.Raw(doc2), + }}, + }) + op := NewGetMoreSampleStreamProcessor("proc1", 42) + require.NoError(t, op.processResponse(nil, resp, driver.ResponseInfo{})) + assert.Equal(t, int64(0), op.ResultCursorID()) + require.Len(t, op.ResultBatch(), 2) +} From 3d3ea9cbafbd213cd453680c316c30415a43b2a5 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Wed, 13 May 2026 16:47:17 -0500 Subject: [PATCH 2/5] Fix unconvert lint. --- mongo/streamprocessing_processors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongo/streamprocessing_processors.go b/mongo/streamprocessing_processors.go index a8ba12f393..9c46e6f3a5 100644 --- a/mongo/streamprocessing_processors.go +++ b/mongo/streamprocessing_processors.go @@ -60,7 +60,7 @@ func (sps *StreamProcessors) Create( } defer releaseSession() - op := operation.NewCreateStreamProcessor(name, bsoncore.Document(pipelineArr)) + op := operation.NewCreateStreamProcessor(name, pipelineArr) if args.DLQ != nil { op = op.DLQ(bsoncore.Document(args.DLQ)) } From 563e92bea8a9694c81a9968b1c7bc33ab8fc4321 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Thu, 14 May 2026 11:04:23 -0500 Subject: [PATCH 3/5] fix broken operations --- mongo/streamprocessing_processor.go | 38 +++++----- mongo/streamprocessing_test.go | 70 +++++++++++++++++++ .../get_more_sample_stream_processor.go | 25 ++++--- .../driver/operation/stream_processor_test.go | 31 ++++---- 4 files changed, 123 insertions(+), 41 deletions(-) diff --git a/mongo/streamprocessing_processor.go b/mongo/streamprocessing_processor.go index 40659de9c4..b2034a956b 100644 --- a/mongo/streamprocessing_processor.go +++ b/mongo/streamprocessing_processor.go @@ -195,35 +195,31 @@ func (sp *StreamProcessor) Stats( // getStreamProcessor. // // Server-internal fields (tenantID, projectId, processorId, …) are not -// surfaced. Unknown fields are tolerated and ignored. +// surfaced. Unknown fields on the wire are tolerated and ignored. type StreamProcessorInfo struct { - ID string `bson:"id"` - Name string `bson:"name"` - State string `bson:"state"` - Pipeline []bson.Raw `bson:"pipeline"` - PipelineVersion int32 `bson:"pipelineVersion"` - Tier string `bson:"tier,omitempty"` - DLQ bson.Raw `bson:"dlq,omitempty"` - StreamMetaFieldName string `bson:"streamMetaFieldName,omitempty"` - EnableAutoScaling bool `bson:"enableAutoScaling"` - FailoverEnabled bool `bson:"failoverEnabled"` - ActiveRegion string `bson:"activeRegion"` - WorkspaceDefaultRegion string `bson:"workspaceDefaultRegion"` - LastStateChange *time.Time `bson:"lastStateChange,omitempty"` - LastModifiedAt *time.Time `bson:"lastModifiedAt,omitempty"` - ModifiedBy string `bson:"modifiedBy"` - HasStarted bool `bson:"hasStarted"` - ErrorMsg string `bson:"errorMsg"` - ErrorRetryable bool `bson:"errorRetryable"` - ErrorCode *int32 `bson:"errorCode,omitempty"` + Name string `bson:"name"` + State string `bson:"state"` + Pipeline []bson.Raw `bson:"pipeline"` + LastStateChange *time.Time `bson:"lastStateChange,omitempty"` + ErrorMsg string `bson:"errorMsg"` } func parseStreamProcessorInfo(raw bson.Raw, bsonOpts *options.BSONOptions, reg *bson.Registry) (*StreamProcessorInfo, error) { if len(raw) == 0 { return nil, errors.New("empty getStreamProcessor response") } + // The current server wraps the processor document inside a "result" + // sub-document; the spec describes the fields at the top level. Decode + // from "result" if present, else fall back to the top-level document so + // the driver works against either shape. + target := raw + if sub, err := raw.LookupErr("result"); err == nil { + if doc, ok := sub.DocumentOK(); ok { + target = bson.Raw(doc) + } + } info := new(StreamProcessorInfo) - dec := getDecoder(raw, bsonOpts, reg) + dec := getDecoder(target, bsonOpts, reg) if err := dec.Decode(info); err != nil { return nil, err } diff --git a/mongo/streamprocessing_test.go b/mongo/streamprocessing_test.go index 9c5a995862..e112e054e6 100644 --- a/mongo/streamprocessing_test.go +++ b/mongo/streamprocessing_test.go @@ -10,6 +10,7 @@ import ( "crypto/tls" "testing" + "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/internal/assert" "go.mongodb.org/mongo-driver/v2/internal/require" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -67,3 +68,72 @@ func TestIsStreamProcessingHost(t *testing.T) { }) } } + +func TestParseStreamProcessorInfo(t *testing.T) { + // Reusable processor sub-document. + procDoc := func(t *testing.T) bson.Raw { + t.Helper() + raw, err := bson.Marshal(bson.D{ + {Key: "name", Value: "proc1"}, + {Key: "state", Value: "STARTED"}, + {Key: "errorMsg", Value: ""}, + }) + require.NoError(t, err) + return raw + } + + t.Run("wrapped in result (current server)", func(t *testing.T) { + raw, err := bson.Marshal(bson.D{ + {Key: "result", Value: bson.Raw(procDoc(t))}, + {Key: "ok", Value: 1.0}, + }) + require.NoError(t, err) + + info, err := parseStreamProcessorInfo(raw, nil, nil) + require.NoError(t, err) + assert.Equal(t, "proc1", info.Name) + assert.Equal(t, "STARTED", info.State) + }) + + t.Run("top-level fields (spec shape)", func(t *testing.T) { + raw, err := bson.Marshal(bson.D{ + {Key: "ok", Value: 1.0}, + {Key: "name", Value: "proc1"}, + {Key: "state", Value: "STARTED"}, + {Key: "errorMsg", Value: ""}, + }) + require.NoError(t, err) + + info, err := parseStreamProcessorInfo(raw, nil, nil) + require.NoError(t, err) + assert.Equal(t, "proc1", info.Name) + assert.Equal(t, "STARTED", info.State) + }) + + t.Run("unknown fields tolerated", func(t *testing.T) { + // Includes server-internal fields the spec says drivers MUST NOT + // surface (processorId, tenantID, projectId). The decode should + // ignore them. + raw, err := bson.Marshal(bson.D{ + {Key: "result", Value: bson.D{ + {Key: "name", Value: "proc1"}, + {Key: "state", Value: "STARTED"}, + {Key: "processorId", Value: "internal-id"}, + {Key: "tenantID", Value: "internal-tenant"}, + {Key: "projectId", Value: "internal-project"}, + {Key: "futureField", Value: "value"}, + }}, + {Key: "ok", Value: 1.0}, + }) + require.NoError(t, err) + + info, err := parseStreamProcessorInfo(raw, nil, nil) + require.NoError(t, err) + assert.Equal(t, "proc1", info.Name) + }) + + t.Run("empty raw returns error", func(t *testing.T) { + _, err := parseStreamProcessorInfo(nil, nil, nil) + require.Error(t, err) + }) +} diff --git a/x/mongo/driver/operation/get_more_sample_stream_processor.go b/x/mongo/driver/operation/get_more_sample_stream_processor.go index d13eb8495f..55c6f73e1a 100644 --- a/x/mongo/driver/operation/get_more_sample_stream_processor.go +++ b/x/mongo/driver/operation/get_more_sample_stream_processor.go @@ -20,8 +20,13 @@ import ( ) // GetMoreSampleStreamProcessor performs a getMoreSampleStreamProcessor -// operation. The response carries a cursorId (0 means exhausted) and a -// nextBatch of documents. +// operation. The response carries a cursorId (0 means exhausted) and a batch +// of documents. +// +// The batch field name is forgiving: the parser reads "messages" (what the +// server currently emits) and falls back to the spec's "nextBatch" name, so +// the driver works against today's server and any future server that aligns +// with the spec. type GetMoreSampleStreamProcessor struct { authenticator driver.Authenticator session *session.Client @@ -67,23 +72,27 @@ func (op *GetMoreSampleStreamProcessor) processResponse(_ context.Context, resp op.resultCursorID = id op.resultBatch = op.resultBatch[:0] - batchVal, err := resp.LookupErr("nextBatch") + // Accept either "messages" (current server) or "nextBatch" (spec). + batchVal, err := resp.LookupErr("messages") if err != nil { - // nextBatch may be absent for an exhausted cursor; treat as empty. - return nil + batchVal, err = resp.LookupErr("nextBatch") + if err != nil { + // Batch may be absent for an exhausted cursor; treat as empty. + return nil + } } arr, ok := batchVal.ArrayOK() if !ok { - return errors.New("getMoreSampleStreamProcessor response nextBatch is not an array") + return errors.New("getMoreSampleStreamProcessor response batch field is not an array") } vals, err := bsoncore.Document(arr).Elements() if err != nil { - return fmt.Errorf("getMoreSampleStreamProcessor response nextBatch is malformed: %w", err) + return fmt.Errorf("getMoreSampleStreamProcessor response batch field is malformed: %w", err) } for _, elem := range vals { doc, ok := elem.Value().DocumentOK() if !ok { - return errors.New("getMoreSampleStreamProcessor response nextBatch element is not a document") + return errors.New("getMoreSampleStreamProcessor response batch element is not a document") } op.resultBatch = append(op.resultBatch, doc) } diff --git a/x/mongo/driver/operation/stream_processor_test.go b/x/mongo/driver/operation/stream_processor_test.go index dc834c7408..db0abe63c9 100644 --- a/x/mongo/driver/operation/stream_processor_test.go +++ b/x/mongo/driver/operation/stream_processor_test.go @@ -306,16 +306,23 @@ func TestStartSampleStreamProcessor_ParseResponse(t *testing.T) { func TestGetMoreSampleStreamProcessor_ParseResponse(t *testing.T) { doc1 := mustBSON(t, bson.D{{Key: "x", Value: 1}}) doc2 := mustBSON(t, bson.D{{Key: "x", Value: 2}}) - resp := mustBSON(t, bson.D{ - {Key: "ok", Value: 1.0}, - {Key: "cursorId", Value: int64(0)}, // exhausted - {Key: "nextBatch", Value: bson.A{ - bson.Raw(doc1), - bson.Raw(doc2), - }}, - }) - op := NewGetMoreSampleStreamProcessor("proc1", 42) - require.NoError(t, op.processResponse(nil, resp, driver.ResponseInfo{})) - assert.Equal(t, int64(0), op.ResultCursorID()) - require.Len(t, op.ResultBatch(), 2) + + // The driver accepts either "messages" (current server) or "nextBatch" + // (spec) as the batch field name. Verify both shapes parse. + for _, field := range []string{"messages", "nextBatch"} { + t.Run(field, func(t *testing.T) { + resp := mustBSON(t, bson.D{ + {Key: "ok", Value: 1.0}, + {Key: "cursorId", Value: int64(0)}, // exhausted + {Key: field, Value: bson.A{ + bson.Raw(doc1), + bson.Raw(doc2), + }}, + }) + op := NewGetMoreSampleStreamProcessor("proc1", 42) + require.NoError(t, op.processResponse(nil, resp, driver.ResponseInfo{})) + assert.Equal(t, int64(0), op.ResultCursorID()) + require.Len(t, op.ResultBatch(), 2) + }) + } } From 522386a5d922c1d34e32213b0063737418879803 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Thu, 14 May 2026 11:04:36 -0500 Subject: [PATCH 4/5] add testing script to verify driver changes work. --- internal/cmd/testasp/main.go | 261 +++++++++++++++++++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 internal/cmd/testasp/main.go diff --git a/internal/cmd/testasp/main.go b/internal/cmd/testasp/main.go new file mode 100644 index 0000000000..e662ba8d6c --- /dev/null +++ b/internal/cmd/testasp/main.go @@ -0,0 +1,261 @@ +// Copyright (C) MongoDB, Inc. 2026-present. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +// testasp is a small smoke test for the streamprocessing client. It connects +// to an Atlas Stream Processing workspace, creates a processor reading from +// the sample_stream_solar source, samples some documents, fetches stats, and +// stops the processor. The processor is always dropped on exit. +// +// Usage: +// +// go run ./internal/cmd/testasp \ +// -uri mongodb://atlas-stream--..a.query.mongodb.net/ \ +// -username \ +// -password +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/internal/uuid" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/streamprocessing" +) + +func main() { + uri := flag.String("uri", "", "workspace endpoint (mongodb://atlas-stream-...)") + username := flag.String("username", "", "username for SCRAM auth") + password := flag.String("password", "", "password for SCRAM auth") + sampleLimit := flag.Int("sample-limit", 20, "max documents to sample on the initial call") + settleSeconds := flag.Int("settle", 3, "seconds to wait after Start before sampling") + samplePolls := flag.Int("sample-polls", 10, "max getMore polls before giving up on samples") + samplePollInterval := flag.Duration("sample-poll-interval", 2*time.Second, "wait between getMore polls") + sampleTarget := flag.Int("sample-target", 5, "stop polling once this many documents have been collected") + verbose := flag.Bool("verbose", false, "include per-operator detail in stats") + sinkConn := flag.String("sink-connection", "inny", "Atlas cluster connection name to use as the $merge sink") + sinkDB := flag.String("sink-db", "testasp", "database name passed to $merge.into.db") + sinkColl := flag.String("sink-collection", "solar_output", "collection name passed to $merge.into.coll") + apm := flag.Bool("apm", false, "print wire command/reply payloads for ASP commands") + flag.Parse() + + if *uri == "" || *username == "" || *password == "" { + fmt.Fprintln(os.Stderr, "all of -uri, -username, -password are required") + flag.Usage() + os.Exit(2) + } + + // Top-level context honors Ctrl-C so cleanup still runs. + rootCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + clientOpts := options.Client(). + ApplyURI(*uri). + SetAuth(options.Credential{Username: *username, Password: *password}) + + if *apm { + clientOpts = clientOpts.SetMonitor(&event.CommandMonitor{ + Started: func(_ context.Context, e *event.CommandStartedEvent) { + if !isASPCommand(e.CommandName) { + return + } + log.Printf("APM > %s: %s", e.CommandName, oneline(bson.Raw(e.Command))) + }, + Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) { + if !isASPCommand(e.CommandName) { + return + } + log.Printf("APM < %s: %s", e.CommandName, oneline(bson.Raw(e.Reply))) + }, + Failed: func(_ context.Context, e *event.CommandFailedEvent) { + if !isASPCommand(e.CommandName) { + return + } + log.Printf("APM x %s: %v", e.CommandName, e.Failure) + }, + }) + } + + client, err := streamprocessing.Connect(clientOpts) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer func() { + discCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := client.Disconnect(discCtx); err != nil { + log.Printf("disconnect: %v", err) + } + }() + + sps := client.StreamProcessors() + id, err := uuid.New() + if err != nil { + log.Fatalf("uuid: %v", err) + } + name := fmt.Sprintf("testasp-%s", id) + log.Printf("processor name: %s", name) + + // Always drop on exit. Use background context so cleanup still runs after + // rootCtx has been cancelled. + defer func() { + dropCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if err := sps.Get(name).Drop(dropCtx); err != nil { + log.Printf("drop (cleanup): %v", err) + } else { + log.Printf("dropped processor %s", name) + } + }() + + pipeline := []bson.D{ + {{Key: "$source", Value: bson.D{{Key: "connectionName", Value: "sample_stream_solar"}}}}, + {{Key: "$merge", Value: bson.D{ + {Key: "into", Value: bson.D{ + {Key: "connectionName", Value: *sinkConn}, + {Key: "db", Value: *sinkDB}, + {Key: "coll", Value: *sinkColl}, + }}, + }}}, + } + log.Printf("pipeline sink: connection=%s db=%s coll=%s", *sinkConn, *sinkDB, *sinkColl) + + createCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + if err := sps.Create(createCtx, name, pipeline); err != nil { + log.Fatalf("create: %v", err) + } + log.Printf("created") + + sp := sps.Get(name) + startCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + if err := sp.Start(startCtx, nil); err != nil { + log.Fatalf("start: %v", err) + } + log.Printf("started; settling for %ds", *settleSeconds) + time.Sleep(time.Duration(*settleSeconds) * time.Second) + + // Exercise getStreamProcessor and log the parsed info so we notice if any + // spec-described fields are missing from the server response. + infoCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + info, err := sps.GetInfo(infoCtx, name) + if err != nil { + log.Printf("getInfo: %v", err) + } else { + var lastChange string + if info.LastStateChange != nil { + lastChange = info.LastStateChange.Format(time.RFC3339) + } + log.Printf("info: name=%q state=%q pipelineStages=%d lastStateChange=%q errorMsg=%q", + info.Name, info.State, len(info.Pipeline), lastChange, info.ErrorMsg) + } + + // Open the sample cursor. The ASP sample cursor is a tailable tap: it + // carries documents that arrive while it is polled, so an initial call + // often returns empty and we need to poll repeatedly. + sampleCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + res, err := sp.GetStreamProcessorSamples(sampleCtx, + options.GetStreamProcessorSamples().SetLimit(int32(*sampleLimit))) + if err != nil { + log.Fatalf("sample (initial): %v", err) + } + collected := append([]bson.Raw(nil), res.Documents...) + log.Printf("initial batch: %d document(s); cursor=%d", len(res.Documents), res.CursorID) + for i, d := range res.Documents { + log.Printf(" [%d] %s", i, oneline(d)) + } + + cursorID := res.CursorID + for poll := 1; poll <= *samplePolls && cursorID != 0 && len(collected) < *sampleTarget; poll++ { + select { + case <-rootCtx.Done(): + log.Printf("interrupted; stopping sample polling") + cursorID = 0 + case <-time.After(*samplePollInterval): + } + if cursorID == 0 { + break + } + + pollCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + more, err := sp.GetStreamProcessorSamples(pollCtx, + options.GetStreamProcessorSamples(). + SetCursorID(cursorID). + SetBatchSize(5)) + cancel() + if err != nil { + log.Printf("sample (poll %d): %v", poll, err) + break + } + cursorID = more.CursorID + collected = append(collected, more.Documents...) + log.Printf("poll %d: %d new document(s); total=%d; cursor=%d", + poll, len(more.Documents), len(collected), cursorID) + for i, d := range more.Documents { + log.Printf(" [%d] %s", len(collected)-len(more.Documents)+i, oneline(d)) + } + } + log.Printf("sampling complete: %d document(s) collected", len(collected)) + + // Stats. + statsCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + var statsOpts []options.Lister[options.GetStreamProcessorStatsOptions] + if *verbose { + statsOpts = append(statsOpts, options.GetStreamProcessorStats().SetVerbose(true)) + } + stats, err := sp.Stats(statsCtx, statsOpts...) + if err != nil { + log.Printf("stats: %v", err) + } else { + log.Printf("stats: %s", oneline(stats)) + } + + // Stop. + stopCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) + defer cancel() + if err := sp.Stop(stopCtx); err != nil { + log.Fatalf("stop: %v", err) + } + log.Printf("stopped") +} + +// oneline renders a BSON document as compact extended JSON. +func oneline(raw bson.Raw) string { + s, err := bson.MarshalExtJSON(raw, false, false) + if err != nil { + return fmt.Sprintf("", err) + } + return string(s) +} + +// isASPCommand reports whether the command name belongs to the Atlas Stream +// Processing wire protocol. +func isASPCommand(name string) bool { + switch name { + case "createStreamProcessor", + "startStreamProcessor", + "stopStreamProcessor", + "dropStreamProcessor", + "getStreamProcessor", + "getStreamProcessorStats", + "startSampleStreamProcessor", + "getMoreSampleStreamProcessor": + return true + } + return false +} From 8a803fcd0e7b6a53f4ad243def4aaa8dbaac5197 Mon Sep 17 00:00:00 2001 From: nickpoindexter Date: Thu, 14 May 2026 11:17:29 -0500 Subject: [PATCH 5/5] fix lint issues --- internal/cmd/testasp/main.go | 24 ++++++++++++++++-------- mongo/streamprocessing_processor.go | 2 +- mongo/streamprocessing_test.go | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/cmd/testasp/main.go b/internal/cmd/testasp/main.go index e662ba8d6c..d9bdc71cab 100644 --- a/internal/cmd/testasp/main.go +++ b/internal/cmd/testasp/main.go @@ -35,6 +35,13 @@ import ( ) func main() { + if err := run(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func run() error { uri := flag.String("uri", "", "workspace endpoint (mongodb://atlas-stream-...)") username := flag.String("username", "", "username for SCRAM auth") password := flag.String("password", "", "password for SCRAM auth") @@ -70,13 +77,13 @@ func main() { if !isASPCommand(e.CommandName) { return } - log.Printf("APM > %s: %s", e.CommandName, oneline(bson.Raw(e.Command))) + log.Printf("APM > %s: %s", e.CommandName, oneline(e.Command)) }, Succeeded: func(_ context.Context, e *event.CommandSucceededEvent) { if !isASPCommand(e.CommandName) { return } - log.Printf("APM < %s: %s", e.CommandName, oneline(bson.Raw(e.Reply))) + log.Printf("APM < %s: %s", e.CommandName, oneline(e.Reply)) }, Failed: func(_ context.Context, e *event.CommandFailedEvent) { if !isASPCommand(e.CommandName) { @@ -89,7 +96,7 @@ func main() { client, err := streamprocessing.Connect(clientOpts) if err != nil { - log.Fatalf("connect: %v", err) + return fmt.Errorf("connect: %w", err) } defer func() { discCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -102,7 +109,7 @@ func main() { sps := client.StreamProcessors() id, err := uuid.New() if err != nil { - log.Fatalf("uuid: %v", err) + return fmt.Errorf("uuid: %w", err) } name := fmt.Sprintf("testasp-%s", id) log.Printf("processor name: %s", name) @@ -134,7 +141,7 @@ func main() { createCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) defer cancel() if err := sps.Create(createCtx, name, pipeline); err != nil { - log.Fatalf("create: %v", err) + return fmt.Errorf("create: %w", err) } log.Printf("created") @@ -142,7 +149,7 @@ func main() { startCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) defer cancel() if err := sp.Start(startCtx, nil); err != nil { - log.Fatalf("start: %v", err) + return fmt.Errorf("start: %w", err) } log.Printf("started; settling for %ds", *settleSeconds) time.Sleep(time.Duration(*settleSeconds) * time.Second) @@ -171,7 +178,7 @@ func main() { res, err := sp.GetStreamProcessorSamples(sampleCtx, options.GetStreamProcessorSamples().SetLimit(int32(*sampleLimit))) if err != nil { - log.Fatalf("sample (initial): %v", err) + return fmt.Errorf("sample (initial): %w", err) } collected := append([]bson.Raw(nil), res.Documents...) log.Printf("initial batch: %d document(s); cursor=%d", len(res.Documents), res.CursorID) @@ -229,9 +236,10 @@ func main() { stopCtx, cancel := context.WithTimeout(rootCtx, 30*time.Second) defer cancel() if err := sp.Stop(stopCtx); err != nil { - log.Fatalf("stop: %v", err) + return fmt.Errorf("stop: %w", err) } log.Printf("stopped") + return nil } // oneline renders a BSON document as compact extended JSON. diff --git a/mongo/streamprocessing_processor.go b/mongo/streamprocessing_processor.go index b2034a956b..87fc856704 100644 --- a/mongo/streamprocessing_processor.go +++ b/mongo/streamprocessing_processor.go @@ -215,7 +215,7 @@ func parseStreamProcessorInfo(raw bson.Raw, bsonOpts *options.BSONOptions, reg * target := raw if sub, err := raw.LookupErr("result"); err == nil { if doc, ok := sub.DocumentOK(); ok { - target = bson.Raw(doc) + target = doc } } info := new(StreamProcessorInfo) diff --git a/mongo/streamprocessing_test.go b/mongo/streamprocessing_test.go index e112e054e6..1af6bae58b 100644 --- a/mongo/streamprocessing_test.go +++ b/mongo/streamprocessing_test.go @@ -84,7 +84,7 @@ func TestParseStreamProcessorInfo(t *testing.T) { t.Run("wrapped in result (current server)", func(t *testing.T) { raw, err := bson.Marshal(bson.D{ - {Key: "result", Value: bson.Raw(procDoc(t))}, + {Key: "result", Value: procDoc(t)}, {Key: "ok", Value: 1.0}, }) require.NoError(t, err)