Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* @author Soby Chacko
* @author Ralf Wiedmann
* @author Gihong Park
* @author Nikita Kibitkin
* @since 3.0.0
*/
public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware {
Expand Down Expand Up @@ -447,6 +448,7 @@ protected Serde<?> getValueSerde(String inboundName, KafkaStreamsConsumerPropert
stream = streamsBuilder.stream(Arrays.asList(bindingTargets),
consumed);
}
applyKafkaStreamsRecordInterceptors(inboundName, stream);
//Check to see if event type based routing is enabled.
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
Expand Down Expand Up @@ -474,6 +476,37 @@ protected Serde<?> getValueSerde(String inboundName, KafkaStreamsConsumerPropert
return getkStream(bindingProperties, stream, nativeDecoding);
}

private void applyKafkaStreamsRecordInterceptors(String inboundName, KStream<?, ?> stream) {
List<KafkaStreamsRecordInterceptor> kafkaStreamsRecordInterceptors =
this.applicationContext.getBeanProvider(KafkaStreamsRecordInterceptor.class).orderedStream().toList();
if (!kafkaStreamsRecordInterceptors.isEmpty()) {
stream.process((ProcessorSupplier<Object, Object, Void, Void>) () -> new Processor<Object, Object, Void, Void>() {

ProcessorContext<Void, Void> context;

@Override
public void init(ProcessorContext<Void, Void> context) {
Processor.super.init(context);
this.context = context;
}

@Override
public void process(Record<Object, Object> record) {
KafkaStreamsRecordInterceptorContext interceptorContext =
new KafkaStreamsRecordInterceptorContext(inboundName, this.context.recordMetadata());
for (KafkaStreamsRecordInterceptor interceptor : kafkaStreamsRecordInterceptors) {
interceptor.intercept(record, interceptorContext);
}
}

@Override
public void close() {
Processor.super.close();
}
});
}
}

private Serde<?> getValueSerdeToUse(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde<?> valueSerde) {
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
return kafkaStreamsConsumerProperties.isUseConfiguredSerdeWhenRoutingEvents() ? valueSerde : new Serdes.BytesSerde();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.streams;

import org.apache.kafka.streams.processor.api.Record;

/**
* Interceptor invoked for records consumed by the Kafka Streams binder before the
* user function sees the inbound {@code KStream}.
*
* @author Nikita Kibitkin
* @since 5.0.2
*/
@FunctionalInterface
public interface KafkaStreamsRecordInterceptor {

/**
* Intercept the inbound Kafka Streams record.
* @param record the inbound record
* @param context metadata about the intercepted record and binding
*/
void intercept(Record<?, ?> record, KafkaStreamsRecordInterceptorContext context);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Optional;

import org.apache.kafka.streams.processor.api.RecordMetadata;

import org.springframework.util.Assert;

/**
* Context for a record intercepted by a {@link KafkaStreamsRecordInterceptor}.
*
* @author Nikita Kibitkin
* @since 5.0.2
*/
public record KafkaStreamsRecordInterceptorContext(String bindingName, Optional<RecordMetadata> recordMetadata) {

/**
* Create a new {@link KafkaStreamsRecordInterceptorContext}.
* @param bindingName the inbound binding name
* @param recordMetadata metadata for the intercepted record
*/
public KafkaStreamsRecordInterceptorContext {
Assert.hasText(bindingName, "'bindingName' must not be empty");
Assert.notNull(recordMetadata, "'recordMetadata' must not be null");
}

/**
* Return the source topic for the intercepted record, if available.
* @return the source topic
*/
public Optional<String> topic() {
return this.recordMetadata.map(RecordMetadata::topic);
}

/**
* Return the source partition for the intercepted record, if available.
* @return the source partition
*/
public Optional<Integer> partition() {
return this.recordMetadata.map(RecordMetadata::partition);
}

/**
* Return the source offset for the intercepted record, if available.
* @return the source offset
*/
public Optional<Long> offset() {
return this.recordMetadata.map(RecordMetadata::offset);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Optional;

import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Nikita Kibitkin
*/
class KafkaStreamsRecordInterceptorContextTests {

@Test
void recordMetadataAccessorsReturnValuesWhenMetadataIsAvailable() {
KafkaStreamsRecordInterceptorContext context = new KafkaStreamsRecordInterceptorContext("process-in-0",
Optional.of(recordMetadata("foo", 1, 42L)));

assertThat(context.topic()).contains("foo");
assertThat(context.partition()).contains(1);
assertThat(context.offset()).contains(42L);
}

@Test
void recordMetadataAccessorsReturnEmptyWhenMetadataIsUnavailable() {
KafkaStreamsRecordInterceptorContext context = new KafkaStreamsRecordInterceptorContext("process-in-0",
Optional.empty());

assertThat(context.topic()).isEmpty();
assertThat(context.partition()).isEmpty();
assertThat(context.offset()).isEmpty();
}

private RecordMetadata recordMetadata(String topic, int partition, long offset) {
return new RecordMetadata() {

@Override
public String topic() {
return topic;
}

@Override
public int partition() {
return partition;
}

@Override
public long offset() {
return offset;
}

};
}

}
Loading