diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index a4e8fbeff3..a27635272a 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -655,6 +655,12 @@ dependencies = [ "half", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + [[package]] name = "clap" version = "4.5.4" @@ -2347,6 +2353,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "lz4_flex" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" +dependencies = [ + "twox-hash", +] + [[package]] name = "matchers" version = "0.2.0" @@ -3384,6 +3399,7 @@ dependencies = [ "bytes", "cadence", "chrono", + "cityhash-rs", "criterion", "ctrlc", "data-encoding", @@ -3392,6 +3408,7 @@ dependencies = [ "hyper 1.6.0", "insta", "json-schema-diff", + "lz4_flex", "md5", "once_cell", "parking_lot", @@ -4648,6 +4665,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.17.0" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 010fa27445..df0e53011a 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -26,11 +26,13 @@ adler = "1.0.2" anyhow = { version = "1.0.69", features = ["backtrace"] } cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } +cityhash-rs = "1.0.1" ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" futures = "0.3.21" hyper = "1.2.0" json-schema-diff = "0.1.7" +lz4_flex = "0.11" md5 = "0.7.0" parking_lot = "0.12.1" procspawn = { version = "1.0.0", features = ["json"] } diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 14cd0e0055..67476cff68 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -209,7 +209,13 @@ impl ClickhouseClient { let host = &config.host; let port = &config.http_port; - let mut base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1"); + // `decompress=1` tells ClickHouse the POST body is in its native + // compressed format (LZ4 blocks framed with CityHash128 checksums) — + // the same wire format `clickhouse-rs` used and what + // `clickhouse-compressor` produces. Distinct from HTTP-standard + // `Content-Encoding: lz4`, which would need `enable_http_compression=1`. + let mut base_url = + format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1"); if matches!(format, InsertFormat::RowBinary) { // RowBinary cannot represent JSON values natively; tell ClickHouse // to treat any binary string targeting a JSON column as JSON text. @@ -249,9 +255,10 @@ impl ClickhouseClient { } pub async fn send(&self, body: Vec, retry_config: RetryConfig) -> anyhow::Result { - // Convert to Bytes once for efficient cloning since sending the request - // moves the body into the request body. - let body_bytes = bytes::Bytes::from(body); + // Compress once before the retry loop — the encoded body is identical + // across attempts, so paying the LZ4 cost per attempt would be wasted + // work. `bytes::Bytes` makes the per-attempt clone cheap (refcount bump). + let body_bytes = bytes::Bytes::from(lz4_compress(&body)); for attempt in 0..=retry_config.max_retries { let url = self.build_url(); @@ -337,6 +344,68 @@ impl ClickhouseClient { } } +/// ClickHouse native compressed-block size cap. Matches the server's +/// `max_compress_block_size` default; sending larger blocks risks tripping +/// server-side decompress limits. +const LZ4_BLOCK_SIZE: usize = 1024 * 1024; + +/// ClickHouse compression method identifier for LZ4 in the native block header. +const LZ4_METHOD_BYTE: u8 = 0x82; + +/// CityHash128 over `data` in the wire layout ClickHouse's +/// `CompressedReadBuffer` reads: 8 little-endian bytes of the low 64-bit half +/// first, then 8 little-endian bytes of the high half. +/// +/// `cityhash-rs` returns a `u128` with the halves swapped relative to that +/// convention (the canonical "low" half ends up in the upper 64 bits of the +/// returned `u128`), so a naive `to_le_bytes()` puts the wrong half first +/// and ClickHouse rejects the body with `CANNOT_DECOMPRESS / Checksum +/// doesn't match`. Rotating by 64 swaps the halves back into the order CH +/// expects. +/// +/// We use CityHash 1.0.2 — that's the variant ClickHouse bundles for +/// compression checksums; the 110 variant is reserved for newer hash columns +/// and is NOT interchangeable here. +fn ch_compression_checksum(data: &[u8]) -> [u8; 16] { + cityhash_rs::cityhash_102_128(data) + .rotate_left(64) + .to_le_bytes() +} + +/// Encode `input` in ClickHouse's native compressed format — the same wire +/// shape `clickhouse-rs` and `clickhouse-compressor` produce, and what the +/// server expects when `decompress=1` is set in the URL. +/// +/// The body is a concatenation of one or more blocks. Each block is laid out: +/// +/// [0..16] CityHash128(header || compressed), low half LE then high half LE +/// [16] LZ4_METHOD_BYTE (0x82) +/// [17..21] u32 LE: compressed size INCLUDING the 9-byte header +/// [21..25] u32 LE: uncompressed size of this block +/// [25..] raw LZ4 block bytes (no frame, no prepended size) +/// +/// The 9-byte (method + sizes) header is hashed together with the compressed +/// bytes so the checksum guards both. +fn lz4_compress(input: &[u8]) -> Vec { + let mut out = Vec::with_capacity(input.len() / 2 + 32); + for chunk in input.chunks(LZ4_BLOCK_SIZE) { + let compressed = lz4_flex::block::compress(chunk); + let compressed_with_header = 9u32 + compressed.len() as u32; + let uncompressed_size = chunk.len() as u32; + + let block_start = out.len(); + out.extend_from_slice(&[0u8; 16]); + out.push(LZ4_METHOD_BYTE); + out.extend_from_slice(&compressed_with_header.to_le_bytes()); + out.extend_from_slice(&uncompressed_size.to_le_bytes()); + out.extend_from_slice(&compressed); + + let checksum = ch_compression_checksum(&out[block_start + 16..]); + out[block_start..block_start + 16].copy_from_slice(&checksum); + } + out +} + #[cfg(test)] mod tests { use super::*; @@ -379,6 +448,7 @@ mod tests { let url = client.build_url(); assert!(url.contains("load_balancing=in_order")); assert!(url.contains("insert_distributed_sync")); + assert!(url.contains("decompress=1")); println!("running test"); let res = client.send(b"[]".to_vec(), RetryConfig::default()).await; println!("Response status {}", res.unwrap().status()); @@ -469,6 +539,85 @@ mod tests { assert!(url.contains("&max_insert_block_size=1048449")); } + /// Walks a buffer of concatenated ClickHouse-native compressed blocks, + /// verifies each block's header layout and CityHash128 checksum, and + /// returns the concatenated decompressed payload. Used by the roundtrip + /// tests below — kept as a helper so single-block and multi-block paths + /// share the same decoder. + fn decode_native_blocks(buf: &[u8]) -> Vec { + let mut decoded = Vec::new(); + let mut pos = 0; + while pos < buf.len() { + assert!(buf.len() - pos >= 25, "truncated block header"); + let stored_checksum: [u8; 16] = buf[pos..pos + 16].try_into().unwrap(); + assert_eq!(buf[pos + 16], LZ4_METHOD_BYTE, "wrong compression method"); + let compressed_with_header = + u32::from_le_bytes(buf[pos + 17..pos + 21].try_into().unwrap()) as usize; + let uncompressed_size = + u32::from_le_bytes(buf[pos + 21..pos + 25].try_into().unwrap()) as usize; + let block_end = pos + 16 + compressed_with_header; + assert!(block_end <= buf.len(), "block size overruns buffer"); + + let computed = ch_compression_checksum(&buf[pos + 16..block_end]); + assert_eq!(computed, stored_checksum, "checksum mismatch"); + + let chunk = lz4_flex::block::decompress(&buf[pos + 25..block_end], uncompressed_size) + .expect("decompress"); + assert_eq!(chunk.len(), uncompressed_size); + decoded.extend_from_slice(&chunk); + pos = block_end; + } + decoded + } + + /// Guards the cityhash-rs ↔ ClickHouse byte-order convention: the wire + /// puts the canonical "low" 64 bits first (LE), and `cityhash-rs` stores + /// that half in the upper 64 bits of its returned `u128`. Without the + /// rotate, this test fails AND CH would reject the body with + /// "Checksum doesn't match" — which is exactly how this bug first + /// surfaced (see the it_works integration test). + #[test] + fn test_compression_checksum_matches_clickhouse_wire_order() { + let data = b"snuba clickhouse native compressed block payload"; + let bytes = ch_compression_checksum(data); + + let wire_low = u64::from_le_bytes(bytes[..8].try_into().unwrap()); + let wire_high = u64::from_le_bytes(bytes[8..].try_into().unwrap()); + + let raw = cityhash_rs::cityhash_102_128(data); + // cityhash-rs convention: canonical "low" in upper bits, "high" in lower. + let canonical_low = (raw >> 64) as u64; + let canonical_high = raw as u64; + + assert_eq!(wire_low, canonical_low); + assert_eq!(wire_high, canonical_high); + } + + #[test] + fn test_lz4_compress_roundtrip_single_block() { + let mut input = b"INSERT INTO eap_items FORMAT RowBinary\n".to_vec(); + for i in 0..1024 { + input.push((i % 251) as u8); + } + assert!(input.len() < LZ4_BLOCK_SIZE); + + let compressed = lz4_compress(&input); + assert_eq!(decode_native_blocks(&compressed), input); + } + + #[test] + fn test_lz4_compress_chunks_at_block_size() { + // 2.5 blocks: exercises the chunking loop (3 blocks expected, last partial). + let input: Vec = (0..(LZ4_BLOCK_SIZE * 2 + LZ4_BLOCK_SIZE / 2)) + .map(|i| (i % 251) as u8) + .collect(); + + let compressed = lz4_compress(&input); + let decoded = decode_native_blocks(&compressed); + assert_eq!(decoded.len(), input.len()); + assert_eq!(decoded, input); + } + #[tokio::test] async fn test_retry_with_exponential_backoff() { crate::testutils::initialize_python();