Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust_snuba/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ adler = "1.0.2"
anyhow = { version = "1.0.69", features = ["backtrace"] }
cadence = "1.0.0"
chrono = { version = "0.4.26", features = ["serde"] }
cityhash-rs = "1.0.1"
ctrlc = { version = "3.2.5", features = ["termination"] }
data-encoding = "2.5.0"
futures = "0.3.21"
hyper = "1.2.0"
json-schema-diff = "0.1.7"
lz4_flex = "0.11"
md5 = "0.7.0"
parking_lot = "0.12.1"
procspawn = { version = "1.0.0", features = ["json"] }
Expand Down
157 changes: 153 additions & 4 deletions rust_snuba/src/strategies/clickhouse/writer_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,13 @@ impl ClickhouseClient {
let host = &config.host;
let port = &config.http_port;

let mut base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1");
// `decompress=1` tells ClickHouse the POST body is in its native
// compressed format (LZ4 blocks framed with CityHash128 checksums) —
// the same wire format `clickhouse-rs` used and what
// `clickhouse-compressor` produces. Distinct from HTTP-standard
// `Content-Encoding: lz4`, which would need `enable_http_compression=1`.
let mut base_url =
format!("{scheme}://{host}:{port}?insert_distributed_sync=1&decompress=1");
if matches!(format, InsertFormat::RowBinary) {
// RowBinary cannot represent JSON values natively; tell ClickHouse
// to treat any binary string targeting a JSON column as JSON text.
Expand Down Expand Up @@ -249,9 +255,10 @@ impl ClickhouseClient {
}

pub async fn send(&self, body: Vec<u8>, retry_config: RetryConfig) -> anyhow::Result<Response> {
// Convert to Bytes once for efficient cloning since sending the request
// moves the body into the request body.
let body_bytes = bytes::Bytes::from(body);
// Compress once before the retry loop — the encoded body is identical
// across attempts, so paying the LZ4 cost per attempt would be wasted
// work. `bytes::Bytes` makes the per-attempt clone cheap (refcount bump).
let body_bytes = bytes::Bytes::from(lz4_compress(&body));

for attempt in 0..=retry_config.max_retries {
let url = self.build_url();
Expand Down Expand Up @@ -337,6 +344,68 @@ impl ClickhouseClient {
}
}

/// ClickHouse native compressed-block size cap. Matches the server's
/// `max_compress_block_size` default; sending larger blocks risks tripping
/// server-side decompress limits.
const LZ4_BLOCK_SIZE: usize = 1024 * 1024;

/// ClickHouse compression method identifier for LZ4 in the native block header.
const LZ4_METHOD_BYTE: u8 = 0x82;

/// CityHash128 over `data` in the wire layout ClickHouse's
/// `CompressedReadBuffer` reads: 8 little-endian bytes of the low 64-bit half
/// first, then 8 little-endian bytes of the high half.
///
/// `cityhash-rs` returns a `u128` with the halves swapped relative to that
/// convention (the canonical "low" half ends up in the upper 64 bits of the
/// returned `u128`), so a naive `to_le_bytes()` puts the wrong half first
/// and ClickHouse rejects the body with `CANNOT_DECOMPRESS / Checksum
/// doesn't match`. Rotating by 64 swaps the halves back into the order CH
/// expects.
///
/// We use CityHash 1.0.2 — that's the variant ClickHouse bundles for
/// compression checksums; the 110 variant is reserved for newer hash columns
/// and is NOT interchangeable here.
fn ch_compression_checksum(data: &[u8]) -> [u8; 16] {
cityhash_rs::cityhash_102_128(data)
.rotate_left(64)
.to_le_bytes()
}

/// Encode `input` in ClickHouse's native compressed format — the same wire
/// shape `clickhouse-rs` and `clickhouse-compressor` produce, and what the
/// server expects when `decompress=1` is set in the URL.
///
/// The body is a concatenation of one or more blocks. Each block is laid out:
///
/// [0..16] CityHash128(header || compressed), low half LE then high half LE
/// [16] LZ4_METHOD_BYTE (0x82)
/// [17..21] u32 LE: compressed size INCLUDING the 9-byte header
/// [21..25] u32 LE: uncompressed size of this block
/// [25..] raw LZ4 block bytes (no frame, no prepended size)
///
/// The 9-byte (method + sizes) header is hashed together with the compressed
/// bytes so the checksum guards both.
fn lz4_compress(input: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(input.len() / 2 + 32);
for chunk in input.chunks(LZ4_BLOCK_SIZE) {
let compressed = lz4_flex::block::compress(chunk);
let compressed_with_header = 9u32 + compressed.len() as u32;
let uncompressed_size = chunk.len() as u32;

let block_start = out.len();
out.extend_from_slice(&[0u8; 16]);
out.push(LZ4_METHOD_BYTE);
out.extend_from_slice(&compressed_with_header.to_le_bytes());
out.extend_from_slice(&uncompressed_size.to_le_bytes());
out.extend_from_slice(&compressed);

let checksum = ch_compression_checksum(&out[block_start + 16..]);
out[block_start..block_start + 16].copy_from_slice(&checksum);
}
out
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -379,6 +448,7 @@ mod tests {
let url = client.build_url();
assert!(url.contains("load_balancing=in_order"));
assert!(url.contains("insert_distributed_sync"));
assert!(url.contains("decompress=1"));
println!("running test");
let res = client.send(b"[]".to_vec(), RetryConfig::default()).await;
println!("Response status {}", res.unwrap().status());
Expand Down Expand Up @@ -469,6 +539,85 @@ mod tests {
assert!(url.contains("&max_insert_block_size=1048449"));
}

/// Walks a buffer of concatenated ClickHouse-native compressed blocks,
/// verifies each block's header layout and CityHash128 checksum, and
/// returns the concatenated decompressed payload. Used by the roundtrip
/// tests below — kept as a helper so single-block and multi-block paths
/// share the same decoder.
fn decode_native_blocks(buf: &[u8]) -> Vec<u8> {
let mut decoded = Vec::new();
let mut pos = 0;
while pos < buf.len() {
assert!(buf.len() - pos >= 25, "truncated block header");
let stored_checksum: [u8; 16] = buf[pos..pos + 16].try_into().unwrap();
assert_eq!(buf[pos + 16], LZ4_METHOD_BYTE, "wrong compression method");
let compressed_with_header =
u32::from_le_bytes(buf[pos + 17..pos + 21].try_into().unwrap()) as usize;
let uncompressed_size =
u32::from_le_bytes(buf[pos + 21..pos + 25].try_into().unwrap()) as usize;
let block_end = pos + 16 + compressed_with_header;
assert!(block_end <= buf.len(), "block size overruns buffer");

let computed = ch_compression_checksum(&buf[pos + 16..block_end]);
assert_eq!(computed, stored_checksum, "checksum mismatch");

let chunk = lz4_flex::block::decompress(&buf[pos + 25..block_end], uncompressed_size)
.expect("decompress");
assert_eq!(chunk.len(), uncompressed_size);
decoded.extend_from_slice(&chunk);
pos = block_end;
}
decoded
}

/// Guards the cityhash-rs ↔ ClickHouse byte-order convention: the wire
/// puts the canonical "low" 64 bits first (LE), and `cityhash-rs` stores
/// that half in the upper 64 bits of its returned `u128`. Without the
/// rotate, this test fails AND CH would reject the body with
/// "Checksum doesn't match" — which is exactly how this bug first
/// surfaced (see the it_works integration test).
#[test]
fn test_compression_checksum_matches_clickhouse_wire_order() {
let data = b"snuba clickhouse native compressed block payload";
let bytes = ch_compression_checksum(data);

let wire_low = u64::from_le_bytes(bytes[..8].try_into().unwrap());
let wire_high = u64::from_le_bytes(bytes[8..].try_into().unwrap());

let raw = cityhash_rs::cityhash_102_128(data);
// cityhash-rs convention: canonical "low" in upper bits, "high" in lower.
let canonical_low = (raw >> 64) as u64;
let canonical_high = raw as u64;

assert_eq!(wire_low, canonical_low);
assert_eq!(wire_high, canonical_high);
}

#[test]
fn test_lz4_compress_roundtrip_single_block() {
let mut input = b"INSERT INTO eap_items FORMAT RowBinary\n".to_vec();
for i in 0..1024 {
input.push((i % 251) as u8);
}
assert!(input.len() < LZ4_BLOCK_SIZE);

let compressed = lz4_compress(&input);
assert_eq!(decode_native_blocks(&compressed), input);
}

#[test]
fn test_lz4_compress_chunks_at_block_size() {
// 2.5 blocks: exercises the chunking loop (3 blocks expected, last partial).
let input: Vec<u8> = (0..(LZ4_BLOCK_SIZE * 2 + LZ4_BLOCK_SIZE / 2))
.map(|i| (i % 251) as u8)
.collect();

let compressed = lz4_compress(&input);
let decoded = decode_native_blocks(&compressed);
assert_eq!(decoded.len(), input.len());
assert_eq!(decoded, input);
}

#[tokio::test]
async fn test_retry_with_exponential_backoff() {
crate::testutils::initialize_python();
Expand Down
Loading