diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index b2cd545814..541eedfacc 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -94,6 +94,7 @@ * @author Soby Chacko * @author Ralf Wiedmann * @author Gihong Park + * @author Nikita Kibitkin * @since 3.0.0 */ public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware { @@ -447,6 +448,7 @@ protected Serde getValueSerde(String inboundName, KafkaStreamsConsumerPropert stream = streamsBuilder.stream(Arrays.asList(bindingTargets), consumed); } + applyKafkaStreamsRecordInterceptors(inboundName, stream); //Check to see if event type based routing is enabled. //See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003 if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) { @@ -474,6 +476,37 @@ protected Serde getValueSerde(String inboundName, KafkaStreamsConsumerPropert return getkStream(bindingProperties, stream, nativeDecoding); } + private void applyKafkaStreamsRecordInterceptors(String inboundName, KStream stream) { + List kafkaStreamsRecordInterceptors = + this.applicationContext.getBeanProvider(KafkaStreamsRecordInterceptor.class).orderedStream().toList(); + if (!kafkaStreamsRecordInterceptors.isEmpty()) { + stream.process((ProcessorSupplier) () -> new Processor() { + + ProcessorContext context; + + @Override + public void init(ProcessorContext context) { + Processor.super.init(context); + this.context = context; + } + + @Override + public void process(Record record) { + KafkaStreamsRecordInterceptorContext interceptorContext = + new KafkaStreamsRecordInterceptorContext(inboundName, this.context.recordMetadata()); + for (KafkaStreamsRecordInterceptor interceptor : kafkaStreamsRecordInterceptors) { + interceptor.intercept(record, interceptorContext); + } + } + + @Override + public void close() { + Processor.super.close(); + } + }); + } + } + private Serde getValueSerdeToUse(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde valueSerde) { if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) { return kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ? valueSerde : new Serdes.BytesSerde(); diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptor.java new file mode 100644 index 0000000000..a522b616e7 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptor.java @@ -0,0 +1,38 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams; + +import org.apache.kafka.streams.processor.api.Record; + +/** + * Interceptor invoked for records consumed by the Kafka Streams binder before the + * user function sees the inbound {@code KStream}. + * + * @author Nikita Kibitkin + * @since 5.0.2 + */ +@FunctionalInterface +public interface KafkaStreamsRecordInterceptor { + + /** + * Intercept the inbound Kafka Streams record. + * @param record the inbound record + * @param context metadata about the intercepted record and binding + */ + void intercept(Record record, KafkaStreamsRecordInterceptorContext context); + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContext.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContext.java new file mode 100644 index 0000000000..fc123b54e0 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContext.java @@ -0,0 +1,67 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams; + +import java.util.Optional; + +import org.apache.kafka.streams.processor.api.RecordMetadata; + +import org.springframework.util.Assert; + +/** + * Context for a record intercepted by a {@link KafkaStreamsRecordInterceptor}. + * + * @author Nikita Kibitkin + * @since 5.0.2 + */ +public record KafkaStreamsRecordInterceptorContext(String bindingName, Optional recordMetadata) { + + /** + * Create a new {@link KafkaStreamsRecordInterceptorContext}. + * @param bindingName the inbound binding name + * @param recordMetadata metadata for the intercepted record + */ + public KafkaStreamsRecordInterceptorContext { + Assert.hasText(bindingName, "'bindingName' must not be empty"); + Assert.notNull(recordMetadata, "'recordMetadata' must not be null"); + } + + /** + * Return the source topic for the intercepted record, if available. + * @return the source topic + */ + public Optional topic() { + return this.recordMetadata.map(RecordMetadata::topic); + } + + /** + * Return the source partition for the intercepted record, if available. + * @return the source partition + */ + public Optional partition() { + return this.recordMetadata.map(RecordMetadata::partition); + } + + /** + * Return the source offset for the intercepted record, if available. + * @return the source offset + */ + public Optional offset() { + return this.recordMetadata.map(RecordMetadata::offset); + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContextTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContextTests.java new file mode 100644 index 0000000000..a41bea8ee7 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorContextTests.java @@ -0,0 +1,72 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams; + +import java.util.Optional; + +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Nikita Kibitkin + */ +class KafkaStreamsRecordInterceptorContextTests { + + @Test + void recordMetadataAccessorsReturnValuesWhenMetadataIsAvailable() { + KafkaStreamsRecordInterceptorContext context = new KafkaStreamsRecordInterceptorContext("process-in-0", + Optional.of(recordMetadata("foo", 1, 42L))); + + assertThat(context.topic()).contains("foo"); + assertThat(context.partition()).contains(1); + assertThat(context.offset()).contains(42L); + } + + @Test + void recordMetadataAccessorsReturnEmptyWhenMetadataIsUnavailable() { + KafkaStreamsRecordInterceptorContext context = new KafkaStreamsRecordInterceptorContext("process-in-0", + Optional.empty()); + + assertThat(context.topic()).isEmpty(); + assertThat(context.partition()).isEmpty(); + assertThat(context.offset()).isEmpty(); + } + + private RecordMetadata recordMetadata(String topic, int partition, long offset) { + return new RecordMetadata() { + + @Override + public String topic() { + return topic; + } + + @Override + public int partition() { + return partition; + } + + @Override + public long offset() { + return offset; + } + + }; + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorTests.java new file mode 100644 index 0000000000..939c40b31b --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsRecordInterceptorTests.java @@ -0,0 +1,199 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka.streams; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.core.annotation.Order; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Nikita Kibitkin + */ +@EmbeddedKafka(topics = {"interceptor-in", "interceptor-out"}) +class KafkaStreamsRecordInterceptorTests { + + private static final List INTERCEPTED_RECORDS = new CopyOnWriteArrayList<>(); + + private static CountDownLatch interceptorLatch; + + private static EmbeddedKafkaBroker embeddedKafka; + + private static Consumer consumer; + + @BeforeAll + public static void setUp() { + embeddedKafka = EmbeddedKafkaCondition.getBroker(); + Map consumerProps = KafkaTestUtils.consumerProps("interceptor-group", "false", + embeddedKafka); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put("value.deserializer", JsonDeserializer.class); + consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + consumer = cf.createConsumer(); + embeddedKafka.consumeFromEmbeddedTopics(consumer, "interceptor-out"); + } + + @AfterAll + public static void tearDown() { + consumer.close(); + } + + @Test + void interceptorsAreAppliedBeforeEventTypeRoutingInOrder() throws Exception { + INTERCEPTED_RECORDS.clear(); + interceptorLatch = new CountDownLatch(4); + SpringApplication app = new SpringApplication(InterceptorConfiguration.class); + app.setWebApplicationType(WebApplicationType.NONE); + + try (ConfigurableApplicationContext context = app.run( + "--server.port=0", + "--spring.jmx.enabled=false", + "--spring.cloud.function.definition=process", + "--spring.cloud.stream.bindings.process-in-0.destination=interceptor-in", + "--spring.cloud.stream.bindings.process-out-0.destination=interceptor-out", + "--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo", + "--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=interceptor-test-id", + "--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + senderProps.put("value.serializer", JsonSerializer.class); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + try { + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic("interceptor-in"); + Headers headers = new RecordHeaders(); + headers.add(new RecordHeader("event_type", "foo".getBytes())); + template.send(new ProducerRecord<>("interceptor-in", 0, 1, new Foo("matched"), headers)); + template.send(new ProducerRecord<>("interceptor-in", 0, 2, new Foo("filtered"))); + + assertThat(interceptorLatch.await(10, TimeUnit.SECONDS)).isTrue(); + ConsumerRecords records = KafkaTestUtils.getRecords(consumer); + + assertThat(records.count()).isEqualTo(1); + assertThat(records.iterator().next().key()).isEqualTo(1); + assertThat(INTERCEPTED_RECORDS).containsExactly( + "first:process-in-0:interceptor-in", + "second:process-in-0:interceptor-in", + "first:process-in-0:interceptor-in", + "second:process-in-0:interceptor-in"); + } + finally { + pf.destroy(); + } + } + } + + @EnableAutoConfiguration + public static class InterceptorConfiguration { + + @Bean + public Function, KStream> process() { + return input -> input; + } + + @Bean + @Order(1) + public KafkaStreamsRecordInterceptor firstInterceptor() { + return (record, context) -> { + INTERCEPTED_RECORDS.add("first:" + context.bindingName() + ":" + context.topic().orElse("unknown")); + interceptorLatch.countDown(); + }; + } + + @Bean + @Order(2) + public KafkaStreamsRecordInterceptor secondInterceptor() { + return (record, context) -> { + INTERCEPTED_RECORDS.add("second:" + context.bindingName() + ":" + context.topic().orElse("unknown")); + interceptorLatch.countDown(); + }; + } + + } + + static class Foo { + + private String foo; + + Foo() { + } + + Foo(String foo) { + this.foo = foo; + } + + public String getFoo() { + return this.foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Foo foo1 = (Foo) o; + return Objects.equals(this.foo, foo1.foo); + } + + @Override + public int hashCode() { + return Objects.hash(this.foo); + } + + } + +}