Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2696,7 +2696,7 @@ impl Coordinator {
};

// Disallow mz_now in any position because read time and write time differ.
let contains_temporal = return_if_err!(selection.contains_temporal(), ctx)
let contains_temporal = selection.contains_temporal()
|| assignments.values().any(|e| e.contains_temporal())
|| returning.iter().any(|e| e.contains_temporal());
if contains_temporal {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl Coordinator {
.catalog()
.validate_timeline_context(source_ids.iter().copied())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent)
&& plan.source.contains_temporal()?
&& plan.source.contains_temporal()
{
// If the source IDs are timestamp independent but the query contains temporal functions,
// then the timeline context needs to be upgraded to timestamp dependent. This is
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl PeekClient {
let contains_temporal = match query_plan {
QueryPlan::Select(s) => s.source.contains_temporal(),
QueryPlan::CopyTo(s, _) => s.source.contains_temporal(),
QueryPlan::Subscribe(s) => Ok(s.from.contains_temporal()),
QueryPlan::Subscribe(s) => s.from.contains_temporal(),
};

// # From sequence_plan
Expand Down Expand Up @@ -548,7 +548,7 @@ impl PeekClient {
// simple benchmarks), because it traverses transitive dependencies even of indexed views and
// materialized views (also traversing their MIR plans).
let mut timeline_context = catalog.validate_timeline_context(source_ids.iter().copied())?;
if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal? {
if matches!(timeline_context, TimelineContext::TimestampIndependent) && contains_temporal {
// If the source IDs are timestamp independent but the query contains temporal functions,
// then the timeline context needs to be upgraded to timestamp dependent. This is
// required because `source_ids` doesn't contain functions.
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/optimize/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl ExprPrep for ExprPrepMaintained {
if let MirScalarExpr::CallUnmaterializable(f) = e {
last_observed_unmaterializable_func = Some(f.clone());
}
})?;
});

if let Some(f) = last_observed_unmaterializable_func {
Err(OptimizerError::UnmaterializableFunction(f))
Expand Down
27 changes: 10 additions & 17 deletions src/expr/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1263,16 +1263,14 @@ impl<E: OptimizableExpr> MapFilterProject<E> {
if let Some(i) = e.as_column() {
reference_count[i] += 1;
}
})
.expect("visit_pre hit recursion limit");
});
}
for (_, pred) in self.predicates.iter() {
pred.visit_pre(&mut |e| {
if let Some(i) = e.as_column() {
reference_count[i] += 1;
}
})
.expect("visit_pre hit recursion limit");
});
}
for proj in self.projection.iter() {
reference_count[*proj] += 1;
Expand Down Expand Up @@ -1323,15 +1321,13 @@ impl<E: OptimizableExpr> MapFilterProject<E> {
pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
for index in 0..self.expressions.len() {
let (prior, expr) = self.expressions.split_at_mut(index);
expr[0]
.visit_mut_post(&mut |e| {
if let Some(i) = e.as_column() {
if should_inline[i] {
*e = prior[i - self.input_arity].clone();
}
expr[0].visit_mut_post(&mut |e| {
if let Some(i) = e.as_column() {
if should_inline[i] {
*e = prior[i - self.input_arity].clone();
}
})
.expect("inlining hit recursion limit");
}
});
}
for (_index, pred) in self.predicates.iter_mut() {
let expressions = &self.expressions;
Expand All @@ -1341,8 +1337,7 @@ impl<E: OptimizableExpr> MapFilterProject<E> {
*e = expressions[i - self.input_arity].clone();
}
}
})
.expect("inlining hit recursion limit");
});
}
}

Expand Down Expand Up @@ -1450,7 +1445,6 @@ pub fn memoize_expr<E: OptimizableExpr>(
memoized_parts: &mut Vec<E>,
input_arity: usize,
) {
#[allow(deprecated)]
expr.visit_mut_pre_post(&mut |e| e.eager_children(), &mut |e| {
if E::is_literal(e) {
// Literals do not need to be memoized.
Expand Down Expand Up @@ -1481,8 +1475,7 @@ pub fn memoize_expr<E: OptimizableExpr>(
E::column(input_arity + memoized_parts.len()),
));
}
})
.expect("memoize_expr hit recursion limit");
});
}

pub mod util {
Expand Down
24 changes: 16 additions & 8 deletions src/expr/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ impl MirRelationExpr {
/// visited in `type_stack`.
pub fn typ(&self) -> ReprRelationType {
let mut type_stack = Vec::new();
#[allow(deprecated)]
self.visit_pre_post_nolimit(
self.visit_pre_post(
&mut |e: &MirRelationExpr| -> Option<Vec<&MirRelationExpr>> {
match &e {
MirRelationExpr::Let { body, .. } => Some(vec![&*body]),
Expand Down Expand Up @@ -960,8 +959,7 @@ impl MirRelationExpr {
/// visited in `arity_stack`.
pub fn arity(&self) -> usize {
let mut arity_stack = Vec::new();
#[allow(deprecated)]
self.visit_pre_post_nolimit(
self.visit_pre_post(
&mut |e: &MirRelationExpr| -> Option<Vec<&MirRelationExpr>> {
match &e {
MirRelationExpr::Let { body, .. } => {
Expand Down Expand Up @@ -1793,7 +1791,6 @@ impl MirRelationExpr {
pub fn try_visit_scalars_mut<F, E>(&mut self, f: &mut F) -> Result<(), E>
where
F: FnMut(&mut MirScalarExpr) -> Result<(), E>,
E: From<RecursionLimitError>,
{
self.try_visit_mut_post(&mut |expr| expr.try_visit_scalars_mut1(f))
}
Expand Down Expand Up @@ -1922,7 +1919,6 @@ impl MirRelationExpr {
pub fn try_visit_scalars<F, E>(&self, f: &mut F) -> Result<(), E>
where
F: FnMut(&MirScalarExpr) -> Result<(), E>,
E: From<RecursionLimitError>,
{
self.try_visit_post(&mut |expr| expr.try_visit_scalars_1(f))
}
Expand Down Expand Up @@ -2386,7 +2382,6 @@ impl VisitChildren<Self> for MirRelationExpr {
fn try_visit_children<F, E>(&self, mut f: F) -> Result<(), E>
where
F: FnMut(&Self) -> Result<(), E>,
E: From<RecursionLimitError>,
{
for child in self.children() {
f(child)?
Expand All @@ -2397,13 +2392,26 @@ impl VisitChildren<Self> for MirRelationExpr {
fn try_visit_mut_children<F, E>(&mut self, mut f: F) -> Result<(), E>
where
F: FnMut(&mut Self) -> Result<(), E>,
E: From<RecursionLimitError>,
{
for child in self.children_mut() {
f(child)?
}
Ok(())
}

fn children<'a>(&'a self) -> impl DoubleEndedIterator<Item = &'a MirRelationExpr>
where
Self: 'a,
{
self.children()
}

fn children_mut<'a>(&'a mut self) -> impl DoubleEndedIterator<Item = &'a mut MirRelationExpr>
where
Self: 'a,
{
self.children_mut()
}
}

/// Specification for an ordering by a column.
Expand Down
6 changes: 2 additions & 4 deletions src/expr/src/relation/canonicalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ pub fn canonicalize_equivalences<'a, I>(
// which will then replace `to_reduce[i]`.
let mut new_equivalence = Vec::with_capacity(to_reduce[i].len());
while let Some((_, mut popped_expr)) = to_reduce[i].pop() {
#[allow(deprecated)]
popped_expr.visit_mut_post_nolimit(&mut |e: &mut MirScalarExpr| {
popped_expr.visit_mut_post(&mut |e: &mut MirScalarExpr| {
// If a simpler expression can be found that is equivalent
// to e,
if let Some(simpler_e) = to_reduce.iter().find_map(|cls| {
Expand Down Expand Up @@ -395,8 +394,7 @@ fn replace_subexpr_and_reduce(
repr_column_types: &[ReprColumnType],
) -> bool {
let mut changed = false;
#[allow(deprecated)]
predicate.visit_mut_pre_post_nolimit(
predicate.visit_mut_pre_post(
&mut |e| {
// The `cond` of an if statement is not visited to prevent `then`
// or `els` from being evaluated before `cond`, resulting in a
Expand Down
3 changes: 1 addition & 2 deletions src/expr/src/relation/join_input_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ impl JoinInputMapper {
// `e` anyway, so we end up visiting nodes in `e` multiple times
// here. Alternatively, consider having the future `PredicateKnowledge`
// take over the responsibilities of this code?
#[allow(deprecated)]
expr.visit_mut_pre_post_nolimit(
expr.visit_mut_pre_post(
&mut |e| {
let mut inputs = self.lookup_inputs(e);
if let Some(first_input) = inputs.next() {
Expand Down
85 changes: 82 additions & 3 deletions src/expr/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,6 @@ impl VisitChildren<Self> for MirScalarExpr {
fn try_visit_children<F, E>(&self, mut f: F) -> Result<(), E>
where
F: FnMut(&Self) -> Result<(), E>,
E: From<RecursionLimitError>,
{
use MirScalarExpr::*;
match self {
Expand Down Expand Up @@ -1356,7 +1355,6 @@ impl VisitChildren<Self> for MirScalarExpr {
fn try_visit_mut_children<F, E>(&mut self, mut f: F) -> Result<(), E>
where
F: FnMut(&mut Self) -> Result<(), E>,
E: From<RecursionLimitError>,
{
use MirScalarExpr::*;
match self {
Expand All @@ -1381,6 +1379,20 @@ impl VisitChildren<Self> for MirScalarExpr {
}
Ok(())
}

fn children<'a>(&'a self) -> impl DoubleEndedIterator<Item = &'a Self>
where
Self: 'a,
{
self.children()
}

fn children_mut<'a>(&'a mut self) -> impl DoubleEndedIterator<Item = &'a mut Self>
where
Self: 'a,
{
self.children_mut()
}
}

impl MirScalarExpr {
Expand Down Expand Up @@ -1609,7 +1621,7 @@ impl FilterCharacteristics {
}
}
},
)?;
);
if literal_inequality_in_current_filter {
literal_inequality += 1;
}
Expand Down Expand Up @@ -2540,4 +2552,71 @@ mod tests {
);
}
}

/// Exercises the `unsafe` pointer stack in [`Visit::visit_mut_post`] with a
/// closure that *replaces subtrees* (`*expr = ...`). Miri's aliasing model
/// should shout if the "stack mirrors the call stack" becomes untrue.
#[mz_ore::test]
fn test_visit_mut_post_replace_subtrees() {
let col = MirScalarExpr::column;
let mut expr = col(0).if_then_else(col(1).if_then_else(col(2), col(3)), col(4));

expr.visit_mut_post(&mut |expr: &mut MirScalarExpr| match expr {
MirScalarExpr::Column(n, _) => *n += 1,
MirScalarExpr::If { then, .. } => {
let then = then.take();
*expr = then;
}
_ => {}
});

// collapses to then-most branch
assert_eq!(expr, col(3));
}

/// Exercises the `unsafe` pointer stack in [`Visit::visit_mut_pre_post`] with
/// a `pre` that both *replaces the visited node wholesale* (`*expr = ...`)
/// and *returns an explicit child set* borrowed from the freshly written
/// value. Miri's aliasing model should shout if the "stack mirrors the call
/// stack" becomes untrue.
#[mz_ore::test]
fn test_visit_mut_pre_post_explicit_children() {
let col = MirScalarExpr::column;
let mut expr = col(5)
.if_then_else(col(6), col(7))
.if_then_else(col(1).if_then_else(col(2), col(3)), col(4));

// turns conditions into column 0 in pre
// doesn't traverse conditions of ifs
// adds 10 to all column refs in post (but not in conditions!)
expr.visit_mut_pre_post(
&mut |expr: &mut MirScalarExpr| -> Option<Vec<&mut MirScalarExpr>> {
if let MirScalarExpr::If { .. } = expr {
let MirScalarExpr::If { then, els, .. } = expr else {
unreachable!()
};
let then = then.take();
let els = els.take();
*expr = MirScalarExpr::column(0).if_then_else(then, els);

let MirScalarExpr::If { then, els, .. } = expr else {
unreachable!()
};
Some(vec![then.as_mut(), els.as_mut()])
} else {
// Leaves recurse with their default (empty) child set.
None
}
},
&mut |expr: &mut MirScalarExpr| {
if let MirScalarExpr::Column(n, _) = expr {
*n += 10;
}
},
);

// conditions become 0; everyone else += 10
let expected = col(0).if_then_else(col(0).if_then_else(col(12), col(13)), col(14));
assert_eq!(expr, expected);
}
}
19 changes: 1 addition & 18 deletions src/expr/src/scalar/optimizable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
use std::fmt::Debug;
use std::hash::Hash;

use mz_ore::stack::RecursionLimitError;
use serde::Serialize;

use crate::scalar::columns::Columns;
use crate::scalar::func::{BinaryFunc, UnaryFunc, VariadicFunc};
use crate::visit::{Visit, VisitChildren};
use crate::visit::VisitChildren;
use crate::{MirScalarExpr, func};

/// A scalar expression type that can be optimized inside a `MapFilterProject`.
Expand Down Expand Up @@ -58,14 +57,6 @@ pub trait OptimizableExpr:
///
/// Returns `(lower_bounds, upper_bounds)` for use in `MfpPlan`.
fn extract_temporal_bounds(temporal: Vec<Self>) -> Result<(Vec<Self>, Vec<Self>), String>;

/// Visit in a pre-traversal. Defaults to the `Visit` implementation, but overridable.
fn visit_pre<F>(&self, f: &mut F) -> Result<(), RecursionLimitError>
where
F: FnMut(&Self),
{
Visit::visit_pre(self, f)
}
}

impl OptimizableExpr for MirScalarExpr {
Expand Down Expand Up @@ -169,12 +160,4 @@ impl OptimizableExpr for MirScalarExpr {

Ok((lower_bounds, upper_bounds))
}

fn visit_pre<F>(&self, f: &mut F) -> Result<(), RecursionLimitError>
where
F: FnMut(&Self),
{
MirScalarExpr::visit_pre(self, f);
Ok(())
}
}
3 changes: 1 addition & 2 deletions src/expr/src/scalar/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ pub fn reduce(expr: &mut MirScalarExpr, column_types: &[ReprColumnType]) {
let mut old = MirScalarExpr::column(0);
while old != *expr {
old = expr.clone();
#[allow(deprecated)]
expr.visit_mut_pre_post_nolimit(
expr.visit_mut_pre_post(
&mut |e| {
reduce_pre(e, column_types);
None
Expand Down
Loading
Loading