Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ public final class StreamingDataflowWorker {
// Experiment make the monitor within BoundedQueueExecutor fair
public static final String BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT =
"windmill_bounded_queue_executor_use_fair_monitor";
// Don't use. Experiment guarding multi key bundles. The feature is work in progress and
// incomplete.
private static final String UNSTABLE_ENABLE_MULTI_KEY_BUNDLE = "unstable_enable_multi_key_bundle";

private final WindmillStateCache stateCache;
private AtomicReference<StreamingWorkerStatusPages> statusPages = new AtomicReference<>();
Expand Down Expand Up @@ -1017,14 +1020,17 @@ private static JobHeader createJobHeader(DataflowWorkerHarnessOptions options, l
private static BoundedQueueExecutor createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
boolean useFairMonitor =
DataflowRunner.hasExperiment(options, BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT);
boolean useKeyGroupWorkQueue =
DataflowRunner.hasExperiment(options, UNSTABLE_ENABLE_MULTI_KEY_BUNDLE);
return new BoundedQueueExecutor(
chooseMaxThreads(options),
THREAD_EXPIRATION_TIME_SEC,
TimeUnit.SECONDS,
chooseMaxBundlesOutstanding(options),
chooseMaxBytesOutstanding(options),
new ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
useFairMonitor);
useFairMonitor,
useKeyGroupWorkQueue);
}

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,24 @@ public void run(BoundedQueueExecutorWorkHandle handle) {
}
}

public final WorkId id() {
public WorkId id() {
return work().id();
}

public final Windmill.WorkItem getWorkItem() {
public Windmill.WorkItem getWorkItem() {
return work().getWorkItem();
}

@Override
public String toString() {
return "ExecutableWork{" + id() + "}";
}

public String getComputationId() {
return work().getComputationId();
}

public Work.KeyGroup getKeyGroup() {
return work().getKeyGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.IntSummaryStatistics;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -74,6 +76,7 @@ public final class Work implements RefreshableWork {
private final Instant startTime;
private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
private final WorkId id;
private final KeyGroup keyGroup;
private final String latencyTrackingId;
private final long serializedWorkItemSize;
private volatile TimedState currentState;
Expand Down Expand Up @@ -101,6 +104,10 @@ private Work(
// keyUniverse inside EnumMap every time.
this.totalDurationPerState = new EnumMap<>(EMPTY_ENUM_MAP);
this.id = WorkId.of(workItem);
this.keyGroup =
workItem.hasKeyGroup()
? KeyGroup.create(workItem.getKeyGroup().getHigh(), workItem.getKeyGroup().getLow())
: KeyGroup.DEFAULT;
this.latencyTrackingId =
Long.toHexString(workItem.getShardingKey())
+ '-'
Expand Down Expand Up @@ -383,6 +390,14 @@ private boolean isCommitPending() {
abstract Instant startTime();
}

public String getComputationId() {
return processingContext.computationId();
}

public KeyGroup getKeyGroup() {
return keyGroup;
}

@AutoValue
public abstract static class ProcessingContext {

Expand Down Expand Up @@ -416,4 +431,60 @@ private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest reque
return Optional.ofNullable(getDataClient().getStateData(computationId(), request));
}
}

/**
* WorkItems with same key group and computation are eligible to be executed together in a
* multi-key bundle.
*/
public static final class KeyGroup {

// Work items equaling to the default keyGroup will always be executed
// separately and not in a multi-key bundle
public static final KeyGroup DEFAULT = new KeyGroup(0, 0);

private final long high;
private final long low;

private KeyGroup(long high, long low) {
this.high = high;
this.low = low;
}

public static KeyGroup create(long high, long low) {
if (high == 0 && low == 0) {
return DEFAULT;
}
return new KeyGroup(high, low);
Comment thread
scwhittle marked this conversation as resolved.
}

public long high() {
return high;
}

public long low() {
return low;
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KeyGroup)) {
return false;
}
KeyGroup other = (KeyGroup) o;
return high == other.high && low == other.low;
}

@Override
public int hashCode() {
return Objects.hash(high, low);
}

@Override
public String toString() {
return String.format("%016x%016x", high, low);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.BoundedQueueExecutorWorkHandle;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.KeyGroup;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
import org.checkerframework.checker.nullness.qual.Nullable;

/** An executor for executing work on windmill items. */
@SuppressWarnings({
Expand Down Expand Up @@ -78,14 +81,19 @@ private static class Budget {
@GuardedBy("this")
private long totalTimeMaxActiveThreadsUsed;

// If set the keyGroupWorkQueue is used by the underlying executor.
private final @Nullable KeyGroupWorkQueue keyGroupWorkQueue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment,
// If set the keyGroupWorkQueue is used by the underlying executor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


public BoundedQueueExecutor(
int initialMaximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory,
boolean useFairMonitor) {
boolean useFairMonitor,
boolean useKeyGroupWorkQueue) {
this.keyGroupWorkQueue = useKeyGroupWorkQueue ? new KeyGroupWorkQueue(useFairMonitor) : null;
this.maximumPoolSize = initialMaximumPoolSize;
monitor = new Monitor(useFairMonitor);
executor =
Expand All @@ -94,7 +102,7 @@ public BoundedQueueExecutor(
initialMaximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
keyGroupWorkQueue != null ? keyGroupWorkQueue : new LinkedBlockingQueue<>(),
threadFactory) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
Expand Down Expand Up @@ -313,7 +321,7 @@ public synchronized void close() {
}
}

private static final class QueuedWork implements Runnable {
static final class QueuedWork implements Runnable {

private final ExecutableWork work;
private final BoundedQueueExecutorWorkHandleImpl handle;
Expand Down Expand Up @@ -378,6 +386,22 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int elements, long bytes)
return new BoundedQueueExecutorWorkHandleImpl(elements, bytes);
}

public @Nullable ExecutableWork pollWork(
String computationId, Work.KeyGroup keyGroup, BoundedQueueExecutorWorkHandle handle) {
Comment thread
scwhittle marked this conversation as resolved.
checkArgument(handle instanceof BoundedQueueExecutorWorkHandleImpl);
checkArgument(computationId != null && keyGroup != null && !keyGroup.equals(KeyGroup.DEFAULT));
BoundedQueueExecutorWorkHandleImpl internalHandle = (BoundedQueueExecutorWorkHandleImpl) handle;
if (keyGroupWorkQueue == null) {
return null;
}
QueuedWork queuedWork = keyGroupWorkQueue.pollWork(computationId, keyGroup);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullable

if (queuedWork == null) {
return null;
}
internalHandle.merge(queuedWork.getHandle());
return queuedWork.getWork();
}

private void decrementCounters(int elements, long bytes) {
// All threads queue decrements and one thread grabs the monitor and updates
// counters. We do this to reduce contention on monitor which is locked by
Expand Down
Loading
Loading