Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ pub struct ActiveSubscribe {
pub start_time: EpochMillis,
/// How to present the subscribe's output.
pub output: SubscribeOutput,
/// If true, this is an internal subscribe that should not appear in
/// introspection tables like mz_subscriptions.
pub internal: bool,
}

impl ActiveSubscribe {
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,11 @@ impl Coordinator {
for sink_id in sink_ids {
let sink = match self.remove_active_compute_sink(sink_id).await {
None => {
tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink");
// This can happen due to a race condition: an internal
// subscribe may be cleaned up via its own message while
// session disconnect cleanup is in progress. This is
// benign.
tracing::debug!(%sink_id, "drop_compute_sinks: sink already removed");
continue;
}
Some(sink) => sink,
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ impl Coordinator {
depends_on: dependency_ids,
start_time: self.now(),
output: plan.output,
internal: false,
};
active_subscribe.initialize();

Expand Down
39 changes: 25 additions & 14 deletions src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,28 @@ impl Coordinator {
.drop_sinks
.insert(id);

let ret_fut = match &active_sink {
let ret_fut: BuiltinTableAppendNotify = match &active_sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
let update =
self.catalog()
.state()
.pack_subscribe_update(id, active_subscribe, Diff::ONE);
let update = self.catalog().state().resolve_builtin_table_update(update);
let table_update_fut = if !active_subscribe.internal {
let update = self.catalog().state().pack_subscribe_update(
id,
active_subscribe,
Diff::ONE,
);
let update = self.catalog().state().resolve_builtin_table_update(update);

self.builtin_table_update().execute(vec![update]).await.0
} else {
// Internal subscribes skip the builtin table update.
Box::pin(std::future::ready(()))
};

self.metrics
.active_subscribes
.with_label_values(&[session_type])
.inc();

self.builtin_table_update().execute(vec![update]).await.0
table_update_fut
}
ActiveComputeSink::CopyTo(_) => {
self.metrics
Expand Down Expand Up @@ -312,13 +320,16 @@ impl Coordinator {

match &sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
let update = self.catalog().state().pack_subscribe_update(
id,
active_subscribe,
Diff::MINUS_ONE,
);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.builtin_table_update().blocking(vec![update]).await;
// Skip builtin table update for internal subscribes
if !active_subscribe.internal {
let update = self.catalog().state().pack_subscribe_update(
id,
active_subscribe,
Diff::MINUS_ONE,
);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.builtin_table_update().blocking(vec![update]).await;
}

self.metrics
.active_subscribes
Expand Down
Loading