Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions aggregator_core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ pub enum BatchMode {
},
}

impl BatchMode {
/// Returns the [`BatchConfig`] message representation of this batch mode, for inclusion in a
/// [`TaskConfiguration`](janus_messages::TaskConfiguration).
///
/// This is the inverse of [`BatchMode::try_from(&BatchConfig)`](TryFrom). The
/// `batch_time_window_size` of [`BatchMode::LeaderSelected`] is a Janus-specific parameter that
/// is not part of the DAP batch configuration, and is silently dropped. Therefore, the HPKE AAD
/// does not force the aggregators to agree on the `batch_time_window_size`.
pub fn to_batch_config(&self) -> BatchConfig {
match self {
BatchMode::TimeInterval => BatchConfig::TimeInterval,
BatchMode::LeaderSelected { .. } => BatchConfig::LeaderSelected,
}
}
}

impl TryFrom<batch_mode::Code> for BatchMode {
type Error = Error;

Expand Down Expand Up @@ -1392,8 +1408,8 @@ mod tests {
vdaf::vdaf_dp_strategies,
};
use janus_messages::{
Duration, HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId, HpkePublicKey,
TaskId, Time, TimePrecision,
BatchConfig, Duration, HpkeAeadId, HpkeConfig, HpkeConfigId, HpkeKdfId, HpkeKemId,
HpkePublicKey, TaskId, Time, TimePrecision,
};
use rand::random;
use serde_json::json;
Expand All @@ -1407,6 +1423,33 @@ mod tests {
},
};

#[test]
fn batch_mode_to_batch_config() {
assert_eq!(
BatchMode::TimeInterval.to_batch_config(),
BatchConfig::TimeInterval
);
// The Janus-specific batch_time_window_size is dropped, so both leader-selected variants
// map to the same BatchConfig.
assert_eq!(
BatchMode::LeaderSelected {
batch_time_window_size: None,
}
.to_batch_config(),
BatchConfig::LeaderSelected
);
assert_eq!(
BatchMode::LeaderSelected {
batch_time_window_size: Some(Duration::from_seconds(
3600,
&TimePrecision::from_seconds(3600)
)),
}
.to_batch_config(),
BatchConfig::LeaderSelected
);
}

#[test]
fn leader_task_serialization() {
roundtrip_encoding(
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod hpke;
pub mod http;
pub mod report_id;
pub mod retries;
pub mod task_config;
#[cfg(feature = "test-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "test-util")))]
pub mod test_util;
Expand Down
200 changes: 200 additions & 0 deletions core/src/task_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//! Construction of canonical [`TaskConfiguration`] messages from a task's internal parameters.
//!
//! In DAP-18, the task's [`TaskConfiguration`] is bound into the HPKE additional authenticated data
//! (AAD) for input shares and aggregate shares. Every party (both aggregators, the client, and the
//! collector) must independently reconstruct *byte-identical* [`TaskConfiguration`] bytes, or all
//! decryption fails. [`build_task_configuration`] is the single canonical construction path that
//! maps internal representations onto the wire format in exactly one place.
//!
//! Endpoint URLs are kept as [`janus_messages::Url`] — the raw bytes from the wire — and
//! bound directly, per DAP-18 §4.1.

use janus_messages::{
BatchConfig, Error, Interval, TaskConfiguration, TaskExtension, TimePrecision, Url as DapUrl,
VdafConfig,
};

/// Construct a canonical [`TaskConfiguration`] from a task's parameters. Endpoints are bound
/// verbatim from their wire bytes (no normalization — see the module docs).
#[allow(clippy::too_many_arguments)]
pub fn build_task_configuration(
task_info: Vec<u8>,
leader_aggregator_endpoint: DapUrl,
helper_aggregator_endpoint: DapUrl,
time_precision: TimePrecision,
min_batch_size: u64,
batch_config: BatchConfig,
vdaf_config: VdafConfig,
task_interval: Option<Interval>,
) -> Result<TaskConfiguration, Error> {
// The optional task_interval becomes the lone extension, or none.
let extensions = Vec::from_iter(task_interval.map(TaskExtension::TaskInterval));

TaskConfiguration::new(
task_info,
leader_aggregator_endpoint,
helper_aggregator_endpoint,
time_precision,
min_batch_size,
batch_config,
vdaf_config,
extensions,
)
}

#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use janus_messages::{
BatchConfig, Duration, Error, Interval, Time, TimePrecision, Url as DapUrl, VdafConfig,
};
use prio::codec::Encode as _;

use super::build_task_configuration;

fn leader() -> DapUrl {
DapUrl::try_from("https://leader.example.com/".as_bytes()).unwrap()
}

fn helper() -> DapUrl {
DapUrl::try_from("https://helper.example.com/".as_bytes()).unwrap()
}

#[test]
fn endpoints_bound_verbatim() {
// A path-bearing endpoint with no trailing slash must be bound exactly as given: this code
// performs no normalization (forbidden by DAP-18 §4.1).
let config = build_task_configuration(
b"task".to_vec(),
DapUrl::try_from("https://leader.example.com/dap".as_bytes()).unwrap(),
DapUrl::try_from("https://helper.example.com/dap".as_bytes()).unwrap(),
TimePrecision::from_seconds(3600),
100,
BatchConfig::TimeInterval,
VdafConfig::Prio3Count,
None,
)
.unwrap();
assert_eq!(
config.leader_aggregator_endpoint().to_string(),
"https://leader.example.com/dap"
);
assert_eq!(
config.helper_aggregator_endpoint().to_string(),
"https://helper.example.com/dap"
);
}

#[test]
fn no_task_interval() {
let config = build_task_configuration(
b"task".to_vec(),
leader(),
helper(),
TimePrecision::from_seconds(3600),
100,
BatchConfig::TimeInterval,
VdafConfig::Prio3Count,
None,
)
.unwrap();
assert!(config.extensions().is_empty());
assert_eq!(config.task_interval(), None);
}

#[test]
fn carries_task_interval() {
let interval = Interval::new(
Time::from_time_precision_units(1000),
Duration::from_time_precision_units(28),
)
.unwrap();
let config = build_task_configuration(
b"task".to_vec(),
leader(),
helper(),
TimePrecision::from_seconds(3600),
100,
BatchConfig::TimeInterval,
VdafConfig::Prio3Count,
Some(interval),
)
.unwrap();
assert_eq!(config.task_interval(), Some(interval));
}

#[test]
fn rejects_empty_task_info() {
assert_matches!(
build_task_configuration(
Vec::new(),
leader(),
helper(),
TimePrecision::from_seconds(3600),
100,
BatchConfig::TimeInterval,
VdafConfig::Prio3Count,
None,
),
Err(Error::InvalidParameter(_))
);
}

/// Pins the exact encoded bytes produced by the canonical builder. Because DAP implementations
/// can be non-Janus, this synthesized encoding is a cross-implementation wire-compatibility
/// contract, so we assert the bytes directly rather than only round-tripping.
#[test]
fn encoded_test_vector() {
let time_precision = TimePrecision::from_seconds(3600);
let config = build_task_configuration(
b"foobar".to_vec(),
DapUrl::try_from("https://example.com/".as_bytes()).unwrap(),
DapUrl::try_from("https://another.example.com/".as_bytes()).unwrap(),
time_precision,
10000,
BatchConfig::TimeInterval,
VdafConfig::Prio3Count,
Some(
Interval::new(
Time::from_time_precision_units(1000000),
Duration::from_time_precision_units(28),
)
.unwrap(),
),
)
.unwrap();

assert_eq!(
hex::encode(config.get_encoded().unwrap()),
concat!(
// task_info: length 0x06, "foobar"
"06",
"666f6f626172",
// leader_aggregator_endpoint: length 0x0014, "https://example.com/"
"0014",
"68747470733a2f2f6578616d706c652e636f6d2f",
// helper_aggregator_endpoint: length 0x001c, "https://another.example.com/"
"001c",
"68747470733a2f2f616e6f746865722e6578616d706c652e636f6d2f",
// time_precision: 3600
"0000000000000e10",
// min_batch_size: 10000
"0000000000002710",
// batch_config: TimeInterval (mode 0x01, empty config)
"01",
"0000",
// vdaf_config: Prio3Count (type 0x00000001, empty config)
"00000001",
"0000",
// extensions: u16 length prefix, then one task_interval extension
"0014",
// extension_type: task_interval (0x0001)
"0001",
// extension_data: u16 length 0x10, Interval{start: 1000000, duration: 28}
"0010",
"00000000000f4240",
"000000000000001c",
)
);
}
}
Loading
Loading