Skip to content

Add Atlas Stream Processing client.#2388

Draft
nickpoindexter wants to merge 5 commits into
mongodb:masterfrom
nickpoindexter:add_asp_commands
Draft

Add Atlas Stream Processing client.#2388
nickpoindexter wants to merge 5 commits into
mongodb:masterfrom
nickpoindexter:add_asp_commands

Conversation

@nickpoindexter
Copy link
Copy Markdown

Summary

Adds first-class driver support for Atlas Stream Processing (ASP). Users
can now connect to a workspace endpoint with a dedicated
streamprocessing.Client and manage stream processors through typed
methods, instead of dispatching commands by hand via RunCommand.

Implementation follows the Atlas Stream Processing driver spec:

  • New top-level streamprocessing package (go.mongodb.org/mongo-driver/v2/streamprocessing)
    with thin type aliases over the new types in the mongo package.
  • 8 hand-rolled operations under x/mongo/driver/operation/ for the ASP
    wire commands: createStreamProcessor, startStreamProcessor,
    stopStreamProcessor, dropStreamProcessor, getStreamProcessor,
    getStreamProcessorStats, startSampleStreamProcessor,
    getMoreSampleStreamProcessor.
  • New option types under mongo/options/streamprocessingoptions.go.
  • Runnable example at examples/stream_processing/.

The deferred commands listed in the spec (modifyStreamProcessor,
listStreamProcessors, listStreamConnections, processStreamProcessor,
listWorkspaceDefaults) are intentionally out of scope here and will land
in follow-up work.

Public API change

This change adds new public surface but does not modify or remove
anything existing. The new exported identifiers:

  • streamprocessing.{Client,StreamProcessors,StreamProcessor,Info,SamplesResult,Connect,IsWorkspaceHost} (type aliases + factory).
  • mongo.{StreamProcessingClient,StreamProcessors,StreamProcessor,StreamProcessorInfo,GetStreamProcessorSamplesResult,ConnectStreamProcessing,IsStreamProcessingHost}.
  • mongo/options.{CreateStreamProcessor*,StartStreamProcessor*,StreamProcessorFailover*,GetStreamProcessorStats*,GetStreamProcessorSamples*} (option types + builders).
  • 8 new operations in x/mongo/driver/operation/ (importable but not semver-covered).

task api-report output:

<paste gorelease output here>

Spec compliance notes

A few requirements from the spec that are worth flagging during review:

  • TLS is forced on. streamprocessing.Connect sets a default
    *tls.Config if the caller didn't, per: "Drivers MUST enable TLS by
    default when connecting to a workspace endpoint and MUST NOT allow TLS
    to be disabled."
  • authSource defaults to admin when credentials are supplied
    without one, per: "Drivers MUST set authSource=admin as the default
    authentication database for workspace connections."
  • Lifecycle commands are not retryable (create/start/stop/drop
    and both sample-cursor commands), per the spec's retryability table
    and the rationale that "stream processor state transitions … are not
    idempotent."
    Only getStreamProcessor and getStreamProcessorStats
    are configured as retryable reads.
  • startAfter is not exposed — the spec reserves it: "RESERVED for
    future use; not yet accepted by the server. Drivers MUST NOT send this
    field until a future revision of this spec enables it."
  • Sample cursor first-call behaviour. GetStreamProcessorSamples
    with no CursorID issues startSampleStreamProcessor followed by an
    immediate getMoreSampleStreamProcessor, so the caller gets documents
    on the first call, per the spec rationale.
  • Unknown response fields are tolerated. StreamProcessorInfo
    decodes into a typed struct that ignores extras; stats responses are
    returned as raw bson.Raw for the same reason. Server-internal fields
    (tenantID, projectId, processorId, …) are never surfaced.
  • state is surfaced as a string, not mapped to a Go enum, so
    future server states pass through untouched.

Test plan

  • task fmt clean.
  • task build (includes compilecheck-119).
  • task test-short passes for the new packages (mongo/options,
    x/mongo/driver/operation, internal/integration/streamprocessing).
    Pre-existing spec-test failures are unrelated (require
    task init-submodule).
  • task lint.
  • task api-report; output above.
  • Manual integration against a real workspace:
    MONGODB_STREAM_PROCESSING_URI=<workspace-uri> go test ./internal/integration/streamprocessing/...
    exercises create → start → stats → sample → stop → drop end-to-end.

Out of scope / follow-ups

  • modifyStreamProcessor, listStreamProcessors, listStreamConnections,
    processStreamProcessor, listWorkspaceDefaults — deferred by spec.
  • UTF YAML spec tests — the spec says these will land alongside this PoC
    in a separate revision.
  • v1 backport — feature targets master (v2) only.

Background & Motivation

@github-actions github-actions Bot added the documentation Pull requests that update documentation or examples label May 13, 2026
Comment thread mongo/streamprocessing.go
// merged ClientOptions.
func applyStreamProcessingDefaults(opts *options.ClientOptions) {
if opts.TLSConfig == nil {
opts.TLSConfig = &tls.Config{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

opts.TLSConfig is created without MinVersion in applyStreamProcessingDefaults, allowing TLS 1.2 for StreamProcessing connections. This enables downgrade to TLS 1.2 (e.g., via a TLS‑inspection proxy) and exposes request/response data to interception.

More details about this

applyStreamProcessingDefaults sets opts.TLSConfig = &tls.Config{} without a MinVersion, so ConnectStreamProcessing may negotiate TLS 1.2. This widens the protocol surface for all StreamProcessing connections instead of requiring TLS 1.3.

Concrete exploit scenario:

  • An on‑path adversary (e.g., a corporate TLS‑inspection proxy) poisons DNS for atlas-stream-*.a.query.mongodb.net and routes traffic through a proxy that only offers TLS 1.2 but presents a trusted enterprise root-signed certificate. Because opts.TLSConfig lacks MinVersion, the client accepts a TLS 1.2 handshake.
  • The proxy then terminates TLS 1.2 and re-encrypts upstream, allowing it to read or modify StreamProcessing requests and responses, including credentials and stream processor configurations handled by ConnectStreamProcessing and subsequent API calls via spc.client.

Code reference: In applyStreamProcessingDefaults, when opts.TLSConfig is nil it is replaced with &tls.Config{}; no MinVersion is set anywhere, so every StreamProcessingClient created by ConnectStreamProcessing inherits this permissive TLS policy.

To resolve this comment:

✨ Commit fix suggestion
  1. Update the TLS configuration to set the minimum version to TLS 1.3 by changing opts.TLSConfig = &tls.Config{} to opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS13}.
  2. This enforces the use of TLS 1.3 for connections, preventing clients from negotiating older, less secure protocols.
💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by missing-ssl-minversion.

🛟 Help? Slack #semgrep-help or go/semgrep-help.

Resolution Options:

  • Fix the code
  • Reply /fp $reason (if security gap doesn’t exist)
  • Reply /ar $reason (if gap is valid but intentional; add mitigations/monitoring)
  • Reply /other $reason (e.g., test-only)

You can view more details about this finding in the Semgrep AppSec Platform.

@mongodb-drivers-pr-bot
Copy link
Copy Markdown
Contributor

mongodb-drivers-pr-bot Bot commented May 13, 2026

🧪 Performance Results

Commit SHA: 8a803fc

The following benchmark tests for version 6a05f5a2218175000743ad12 had statistically significant changes (i.e., |z-score| > 1.96):

Benchmark Measurement % Change Patch Value Stable Region H-Score Z-Score
BenchmarkSingleRunCommand ops_per_second_min -37.4716 965.9475 Avg: 1544.8140
Med: 1587.7199
Stdev: 245.3113
0.8021 -2.3597
BenchmarkSingleRunCommand total_mem_allocs -10.1102 1062251.0000 Avg: 1181725.2000
Med: 1194156.0000
Stdev: 23323.4230
0.9082 -5.1225
BenchmarkSingleRunCommand total_bytes_allocated -10.1100 99011520.0000 Avg: 110147400.0000
Med: 111321920.0000
Stdev: 2170624.8712
0.9086 -5.1303
BenchmarkSingleRunCommand total_time_seconds -6.1939 1.0778 Avg: 1.1490
Med: 1.1414
Stdev: 0.0299
0.7780 -2.3786
BenchmarkBSONDeepDocumentDecoding total_mem_allocs -3.8722 12937215.0000 Avg: 13458352.8333
Med: 13541502.5000
Stdev: 198722.3513
0.8163 -2.6224
BenchmarkBSONDeepDocumentDecoding total_bytes_allocated -3.8638 281074608.0000 Avg: 292371264.0000
Med: 294171308.0000
Stdev: 4307574.0534
0.8163 -2.6225
BenchmarkBSONDeepDocumentDecoding ops_per_second_med -3.3629 16476.9076 Avg: 17050.2983
Med: 17167.3918
Stdev: 236.5464
0.8066 -2.4240
BenchmarkBSONDeepDocumentEncoding total_time_seconds 2.2337 1.2207 Avg: 1.1940
Med: 1.1944
Stdev: 0.0093
0.8217 2.8547
BenchmarkBSONFullDocumentDecoding allocated_bytes_per_op -0.0388 25320.0000 Avg: 25329.8333
Med: 25330.0000
Stdev: 0.4082
0.9859 -24.0866

For a comprehensive view of all microbenchmark results for this PR's commit, please check out the Evergreen perf task for this patch.

@mongodb-drivers-pr-bot
Copy link
Copy Markdown
Contributor

API Change Report

./v2/mongo

compatible changes

ConnectStreamProcessing: added
GetStreamProcessorSamplesResult: added
IsStreamProcessingHost: added
StreamProcessingClient: added
StreamProcessor: added
StreamProcessorInfo: added
StreamProcessors: added

./v2/mongo/options

compatible changes

CreateStreamProcessor: added
CreateStreamProcessorOptions: added
CreateStreamProcessorOptionsBuilder: added
GetStreamProcessorSamples: added
GetStreamProcessorSamplesOptions: added
GetStreamProcessorSamplesOptionsBuilder: added
GetStreamProcessorStats: added
GetStreamProcessorStatsOptions: added
GetStreamProcessorStatsOptionsBuilder: added
StartStreamProcessor: added
StartStreamProcessorOptions: added
StartStreamProcessorOptionsBuilder: added
StreamProcessorFailover: added
StreamProcessorFailoverOptions: added
StreamProcessorFailoverOptionsBuilder: added

./v2/streamprocessing

compatible changes

package added

./v2/x/mongo/driver/operation

compatible changes

CreateStreamProcessor: added
DropStreamProcessor: added
GetMoreSampleStreamProcessor: added
GetStreamProcessor: added
GetStreamProcessorStats: added
NewCreateStreamProcessor: added
NewDropStreamProcessor: added
NewGetMoreSampleStreamProcessor: added
NewGetStreamProcessor: added
NewGetStreamProcessorStats: added
NewStartSampleStreamProcessor: added
NewStartStreamProcessor: added
NewStopStreamProcessor: added
StartSampleStreamProcessor: added
StartStreamProcessor: added
StopStreamProcessor: added

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Pull requests that update documentation or examples

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant