-
Notifications
You must be signed in to change notification settings - Fork 36
Add mobile_verifier_compare crate: PG ↔ Trino reward-query parity CLI #1199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
macpie
wants to merge
4
commits into
main
Choose a base branch
from
macpie/awesome-goodall-68d09c
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
5fecdb7
Add mobile_verifier_compare crate: PG ↔ Trino reward-query parity CLI
macpie 91a2311
Fmt
macpie 98f4222
Fix mobile_verifier_compare: use production constants instead of hard…
macpie d0f8f0c
Fix mobile_verifier_compare parity issues caught in PR review
macpie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| # Auto-written by `mobile-verifier-compare seed` on first run, holds | ||
| # docker-compose-defaults for local dev. Regenerated automatically — not | ||
| # meant to be committed. | ||
| pkg/compare-trino-settings.local.toml |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| [package] | ||
| name = "mobile-verifier-compare" | ||
| version = "0.1.0" | ||
| description = "Validation CLI: diffs mobile_verifier's Postgres reward-input queries against equivalent Trino/Iceberg queries, with a local-stack seed bootstrapper." | ||
| license.workspace = true | ||
| edition.workspace = true | ||
| authors.workspace = true | ||
|
|
||
| [[bin]] | ||
| name = "mobile-verifier-compare" | ||
| path = "src/main.rs" | ||
|
|
||
| [dependencies] | ||
| # Production crate we call into so the Postgres side of every comparison is | ||
| # *byte-identical* to what the reward pipeline runs (zero SQL drift). | ||
| mobile-verifier = { path = "../mobile_verifier" } | ||
|
|
||
| # Trino client (no longer optional anywhere — it's the whole point of this crate). | ||
| trino-client = { path = "../trino_client" } | ||
| trino-rust-client = { version = "0.9" } | ||
|
|
||
| # Settings + PG pool plumbing (same shape mobile_verifier uses). | ||
| db-store = { path = "../db_store" } | ||
| custom-tracing = { path = "../custom_tracing" } | ||
|
|
||
| # General workspace deps. Kept in lockstep with mobile_verifier's set so this | ||
| # crate sees the same versions as the functions it calls into. | ||
| anyhow = { workspace = true } | ||
| chrono = { workspace = true } | ||
| clap = { workspace = true } | ||
| config = { workspace = true } | ||
| futures = { workspace = true } | ||
| helium-crypto = { workspace = true, features = ["sqlx-postgres"] } | ||
| rand = { workspace = true } | ||
| rust_decimal = { workspace = true } | ||
| serde = { workspace = true } | ||
| sqlx = { workspace = true } | ||
| tokio = { workspace = true } | ||
| tracing = { workspace = true } | ||
| uuid = { workspace = true } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| # Settings file for `mobile-verifier compare-trino`. | ||
| # | ||
| # This is a *minimal* config that only carries what the diff CLI needs: | ||
| # - a Postgres URL for the mobile_verifier database | ||
| # - a Trino endpoint to query the corresponding Iceberg tables | ||
| # | ||
| # The full mobile_verifier daemon needs much more (buckets, signing keypair, | ||
| # config client, …). Don't reuse this file for `mobile-verifier server`. | ||
| # | ||
| # Usage: | ||
| # cargo run -p mobile-verifier -- -c ./compare-trino-settings.toml \ | ||
| # compare-trino --start 2026-05-20T00:00:00Z --end 2026-05-21T00:00:00Z all | ||
| # | ||
| # Any value can be overridden by env var, prefix `MV__`, double underscore as | ||
| # the path separator. Examples: | ||
| # MV__DATABASE__URL=postgres://... | ||
| # MV__TRINO__AUTH__PASSWORD=... mobile-verifier -c ... compare-trino ... | ||
|
|
||
| # Optional: tracing filter. Defaults to "mobile_verifier=info". | ||
| # log = "mobile_verifier=debug,sqlx=warn" | ||
|
|
||
| # ── Postgres (mobile_verifier database) ────────────────────────────────────── | ||
| # This is the same `[database]` block used by the rest of mobile_verifier; if | ||
| # you already have a working mobile_verifier settings.toml, copy that block | ||
| # here verbatim. | ||
| [database] | ||
| url = "postgres://postgres:postgres@127.0.0.1:5432/mobile_verifier" | ||
| max_connections = 10 | ||
|
|
||
| # Optional CA bundle for TLS to RDS, etc. | ||
| # ca_path = "/etc/ssl/certs/rds-ca-bundle.pem" | ||
|
|
||
| # ── Trino ──────────────────────────────────────────────────────────────────── | ||
| # The Trino coordinator that fronts the Iceberg catalog holding `poc.*` and | ||
| # `rewards.*`. `catalog` should be the catalog name configured in Trino for | ||
| # your Iceberg REST catalog (commonly "iceberg"). | ||
| [trino] | ||
| host = "trino.example.com" | ||
| port = 443 | ||
| user = "mobile-verifier-compare" | ||
| secure = true | ||
| catalog = "iceberg" | ||
| # `schema` is optional — every query in this CLI is fully qualified (poc.*, | ||
| # rewards.*), so the default schema doesn't actually matter. | ||
| # schema = "default" | ||
|
|
||
| # For local dev against a Trino with a self-signed cert: | ||
| # insecure_skip_tls_verify = true | ||
|
|
||
| # Pick exactly one auth block, or omit entirely for anonymous access. | ||
|
|
||
| # --- Basic auth --- | ||
| # [trino.auth] | ||
| # type = "basic" | ||
| # username = "alice" | ||
| # password = "hunter2" | ||
|
|
||
| # --- JWT bearer token --- | ||
| # [trino.auth] | ||
| # type = "jwt" | ||
| # token = "eyJhbGciOi..." |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| use crate::diff::{DiffTable, FloatEq, DEFAULT_F64_EPSILON}; | ||
| use crate::CommonArgs; | ||
| use anyhow::{Context, Result}; | ||
| use mobile_verifier::data_session::aggregate_hotspot_data_sessions_to_dc; | ||
| use serde::{Deserialize, Serialize}; | ||
| use sqlx::{Pool, Postgres}; | ||
| use std::collections::BTreeMap; | ||
| use trino_client::{Client as TrinoClient, Statement}; | ||
| use trino_rust_client::Trino; | ||
|
|
||
| #[derive(Debug, Clone, Default, PartialEq)] | ||
| struct DcSummary { | ||
| rewardable_bytes: u64, | ||
| rewardable_dc: u64, | ||
| } | ||
|
|
||
| impl FloatEq for DcSummary { | ||
| fn float_eq(&self, other: &Self, _: f64) -> bool { | ||
| self == other | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Trino, Serialize, Deserialize)] | ||
| struct TrinoDcRow { | ||
| hotspot_key: String, | ||
| rewardable_bytes: i64, | ||
| dc_transfer_reward: i64, | ||
| } | ||
|
|
||
| /// Trino-side: aggregate `rewards.data_transfer` rows whose `[start_period, end_period)` | ||
| /// overlaps the requested window. This is the *output* of the production reward | ||
| /// pipeline, not raw sessions, so this comparison is an input-vs-output sanity | ||
| /// check: PG numbers should be >= Trino numbers in a normal epoch (PG includes | ||
| /// all sessions, Trino only includes those that got rewards written). | ||
| const TRINO_SQL: &str = r#" | ||
| SELECT | ||
| hotspot_key, | ||
| cast(sum(rewardable_bytes) AS bigint) AS rewardable_bytes, | ||
| cast(sum(dc_transfer_reward) AS bigint) AS dc_transfer_reward | ||
| FROM rewards.data_transfer | ||
| WHERE start_period < :end AND end_period > :start | ||
| GROUP BY hotspot_key | ||
| "#; | ||
|
|
||
| pub async fn run(pg: &Pool<Postgres>, trino: &TrinoClient, args: &CommonArgs) -> Result<()> { | ||
| let epoch = args.epoch(); | ||
|
|
||
| let pg_raw = aggregate_hotspot_data_sessions_to_dc(pg, &epoch) | ||
| .await | ||
| .context("running aggregate_hotspot_data_sessions_to_dc")?; | ||
| let pg_rows: BTreeMap<String, DcSummary> = pg_raw | ||
| .into_iter() | ||
| .map(|(k, v)| { | ||
| ( | ||
| k.to_string(), | ||
| DcSummary { | ||
| rewardable_bytes: v.rewardable_bytes, | ||
| rewardable_dc: v.rewardable_dc, | ||
| }, | ||
| ) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let stmt = Statement::new(TRINO_SQL) | ||
| .bind("start", epoch.start) | ||
| .bind("end", epoch.end) | ||
| .typed::<TrinoDcRow>(); | ||
| let trino_raw: Vec<TrinoDcRow> = trino | ||
| .get_all(stmt) | ||
| .await | ||
| .context("running Trino rewards.data_transfer query")?; | ||
| let trino_rows: BTreeMap<String, DcSummary> = trino_raw | ||
| .into_iter() | ||
| .map(|r| { | ||
| ( | ||
| r.hotspot_key, | ||
| DcSummary { | ||
| rewardable_bytes: r.rewardable_bytes.max(0) as u64, | ||
| rewardable_dc: r.dc_transfer_reward.max(0) as u64, | ||
| }, | ||
| ) | ||
| }) | ||
| .collect(); | ||
|
|
||
| let mut table = DiffTable::new( | ||
| "data-sessions — PG input aggregate vs Trino rewards.data_transfer output", | ||
| "hotspot_key", | ||
| "bytes/dc", | ||
| ) | ||
| .with_note( | ||
| "PG side sums raw hotspot_data_transfer_sessions; Trino side sums rewards.data_transfer (already-paid rewards). \ | ||
| Mismatches are expected on sessions that didn't make it to a reward (e.g. sub-DC dust).", | ||
| ) | ||
| .with_options(args.show_all, args.limit); | ||
| table.fill( | ||
| pg_rows, | ||
| trino_rows, | ||
| DEFAULT_F64_EPSILON, | ||
| |k| k.clone(), | ||
| |v| format!("{}/{}", v.rewardable_bytes, v.rewardable_dc), | ||
| |p, t| { | ||
| format!( | ||
| "bytes={:+} dc={:+}", | ||
| p.rewardable_bytes as i128 - t.rewardable_bytes as i128, | ||
| p.rewardable_dc as i128 - t.rewardable_dc as i128, | ||
| ) | ||
| }, | ||
| ); | ||
| table.print(); | ||
| Ok(()) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.