Skip to content

GH-3188 Add Kafka Streams record interceptor#3197

Open
nikita-kibitkin wants to merge 1 commit into
spring-cloud:mainfrom
nikita-kibitkin:gh-3188-kafka-streams-record-interceptor
Open

GH-3188 Add Kafka Streams record interceptor#3197
nikita-kibitkin wants to merge 1 commit into
spring-cloud:mainfrom
nikita-kibitkin:gh-3188-kafka-streams-record-interceptor

Conversation

@nikita-kibitkin
Copy link
Copy Markdown

Resolves #3188

Adds a Kafka Streams-specific KafkaStreamsRecordInterceptor hook for inbound KStream records, giving logging, metrics, tracing, and similar cross-cutting concerns a single place to observe records instead of repeating .peek() or .process() in every stream function.

The interceptor runs as a side-effect Processor API node in AbstractKafkaStreamsBinderProcessor.getKStream(), before event-type routing and before the user function receives the stream. It is observe-only: it does not mutate, filter, or deserialize records. Multiple interceptors are ordered via Spring's @Order, resolved through getBeanProvider(...).orderedStream(), so no separate composite type is needed.

Scoped to the Kafka Streams binder only, and kept separate from the broader per-binding hook in #3137 per the issue discussion.

Changes:

  • Add KafkaStreamsRecordInterceptor and KafkaStreamsRecordInterceptorContext
  • Apply ordered interceptors in AbstractKafkaStreamsBinderProcessor.getKStream()
  • Add tests for interceptor ordering and placement, and for the context accessors

Add a Kafka Streams-specific interceptor hook before event-type routing and user function processing.

Signed-off-by: Nikita Kibitkin <nikita.n.kibitkin@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add RecordInterceptor support for Kafka Streams binder

1 participant