[GSoC 2026] Kafka Streams runner — ExecutableStage (stateless ParDo) translator#38764
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements the third sub-issue of the Kafka Streams runner GSoC 2026 project, focusing on the execution of stateless user code. By adding the ExecutableStage translator and the necessary bridge to the SDK harness, the runner can now execute fused ParDo operations using the Fn API. This change includes infrastructure for managing harness contexts and robust testing utilities to verify execution within the Kafka Streams topology. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements the translation and execution of fused executable stages (beam:runner:executable_stage:v1) for the Kafka Streams runner, allowing stateless user code to run in the Beam SDK harness. The feedback highlights two critical issues: a concurrency bug in ExecutableStageProcessor due to the use of a non-thread-safe ArrayDeque for queueing outputs across threads, which should be replaced with ConcurrentLinkedQueue, and a potential memory leak in KafkaStreamsExecutableStageContextFactory where job factories are not cleaned up from the map when their reference count drops to zero.
There was a problem hiding this comment.
Code Review
This pull request introduces support for executing fused ExecutableStage nodes (stateless user code) in the Beam SDK harness over the Fn API for the Kafka Streams runner. It adds the ExecutableStageProcessor to run the fused code, the ExecutableStageTranslator to handle the translation, and a context factory to manage shared SDK harness environments. Feedback on the changes highlights a concurrency issue in ExecutableStageProcessor where a non-thread-safe ArrayDeque is used for pending outputs, which should be replaced with ConcurrentLinkedQueue. Additionally, the translator should explicitly reject unsupported features like side inputs, user states, and timers to fail fast with descriptive errors.
|
Assigning reviewers: R: @chamikaramj added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
- ExecutableStageProcessor: switch pendingOutputs from ArrayDeque to ConcurrentLinkedQueue. The queue is populated by SDK harness threads through the OutputReceiverFactory callback and drained by the Kafka Streams processing thread on bundle close; ArrayDeque is not thread-safe and offers no cross-thread visibility guarantees. - KafkaStreamsExecutableStageContextFactory: drop the per-job entry from jobFactories in the release callback so a long-lived JVM that runs many jobs does not accumulate one factory per finished job. - ExecutableStageTranslator: read the main input PCollection id from stagePayload.getInput() instead of Iterables.getOnlyElement on the PTransform inputs map (which conflated main + side inputs), and add explicit fail-fast rejections for side inputs, user state and timers so users get a clear message rather than a silent miss.
je-ik
left a comment
There was a problem hiding this comment.
This is a solid foundation, I have a few comments in the code. I have two related questions:
a) I'm missing using a Coder when pushing the data to the SDK harness - because SDK defines the serialization format, we must work with it, otherwise the SDK harness will crash when attempting to interpret the data we feed into it.
b) I think this deficiency will show itself if we create a test, that will test the complete Pipeline and run it through the runner, e.g.
Pipeline p = Pipeline.create(opts);
PCollection output = p.apply(Impulse.create())
.apply(MapElements.into(..).via(e -> 1)
.apply(Redistribute.of()) // we don't have this yet and this will break the fusion, so we will need VarIntCoder, if we remove it, then it might work under current implementation
.apply(/* collect */)
p.run();
// assert we have collected the elementIf this works, then it is probably due to the fact that Impulse produces byte[], once we would be able to produce (unfused) elements of other types (e.g. int), the need for coder will reveal.
- ExecutableStageProcessor: drain pendingOutputs with forEach + clear instead of a poll loop (cleaner, no behavioural change — the harness has already completed the bundle by the time we drain, so ConcurrentLinkedQueue's weakly-consistent iterator is fine). - ExecutableStageProcessor.forwardWatermark: add a TODO referencing Jan's note that watermarks must reach every parallel instance of every downstream processor, which Kafka Streams' key-routed forward does not give us by itself; that fan-out lands with the WatermarkManager sub-issue. - ExecutableStageTranslator: add a code comment to the multi-output rejection explaining the limitation is temporary (output-tag dispatch + per-output PCollection routing is a planned follow-up). - SharedTestCollector: implement AutoCloseable; close() removes the per-UUID entry from the static registry so a long-lived JVM that runs many tests does not accumulate orphan entries. - ExecutableStageTranslatorTest: use try-with-resources around the collector so the entry is cleaned up at the end of the test.
Summary
Third sub-issue under the Kafka Streams runner GSoC 2026 project. Adds the ExecutableStage translator and SDK-harness bridge so fused stateless user code (ParDo etc.) actually runs in the SDK harness over the Fn API and its outputs flow back into the topology.
Per design doc §4.2 and the live discussion with @je-ik on the issue.
What's in this PR
KafkaStreamsExecutableStageContextFactorymirroring Flink's pattern.ExecutableStageProcessor(the harness bridge) +ExecutableStageTranslator.ExecutableStage.URN;prepareForTranslationnow runsGreedyPipelineFuser.KStreamsPayload.toStringusesMoreObjects.toStringHelper(post-merge tweak from [GSoC 2026] Kafka Streams runner — translation framework + Impulse translator #38689).testImplementation project(':sdks:java:harness')for the EMBEDDED environment.Tests
ExecutableStageTranslatorTestbuildsImpulse -> ParDovia the BeamJava SDK, drives the resulting topology under
TopologyTestDriverwith the EMBEDDED environment, and asserts via side effect that the
DoFn ran in the SDK harness with the expected input. Approach
discussed with @je-ik on [GSoC 2026] Kafka Streams Runner — ExecutableStage (stateless ParDo) translator #38743 — because the ParDo's output has no
downstream consumer, the stage has no output PCollection and the
harness does not deliver the value back to the runner (per Beam
semantics), so the bridge is verified by a recorded side effect.
SharedTestCollector<T>helper: instances areSerializablebut their identity is a UUID; the actual storage lives in a static
registry keyed by UUID. Survives the runner cloning the DoFn
(current or future EMBEDDED behaviour).
KafkaStreamsPipelineTranslatorTestupdated so the Impulse casebuilds via the Beam SDK (validator-compliant proto for the fuser)
and the URN-rejection case calls
translatedirectly to keep thedispatch-loop contract isolated.
Validation
./gradlew :runners:kafka-streams:checkgreen locally (14 tests).@SuppressWarningsof any flavor in the new code.Notes / deferred
ExecutableStageProcessorisprovisional. When the
WatermarkManagerlands, the output watermarkwill be
min()across received watermarks and the flush should fireonly when that minimum moves forward, not on every received
watermark — comment in the processor flags this.
KStreamsPayloadCoder + KafkaSerdefor topic-boundaryserialization is still deferred to the first sub-issue that
introduces a topic boundary (GBK / repartition).
Closes #38743
Refs #18479
cc @je-ik