Skip to content
Draft
82 changes: 52 additions & 30 deletions packages/rs-platform-wallet/src/wallet/core/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ where
event = "post_broadcast_reservation_leaked_until_sync",
%txid,
wallet_id = %hex::encode(wallet_id),
leaked_reservations = reservation.reserved_count(),
"leaking outpoint reservation: post-broadcast reconciliation failed"
);
reservation.leak_until_sync();
Expand Down Expand Up @@ -281,36 +282,15 @@ impl<B: TransactionBroadcaster + ?Sized> CoreWallet<B> {
});
}

// Pick the next change address. Peek (advance=false) first; if the
// peeked address is already pending from a concurrent in-flight
// send (CMT-006), advance the derivation index and peek again
// until we find one that is not pending. The final chosen
// address is committed (advance=true) inside this same write
// lock and inserted into the reservation set so a concurrent
// caller can never select the same change address. Advancing
// burns at most one index per concurrent in-flight send — a
// bounded, acceptable cost for privacy.
let pending_change = self.reservations.pending_change_snapshot();
let change_addr = loop {
let peeked = managed_account
.next_change_address(Some(&xpub), false)
.map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?;
if !pending_change.contains(&peeked) {
// Commit the advance now (under the same write lock as
// the outpoint reservation below). On broadcast failure
// a single index is burned — acceptable; on success the
// pending-change entry is released when the guard drops
// post-`check_core_transaction`.
let _ = managed_account
.next_change_address(Some(&xpub), true)
.map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?;
break peeked;
}
// Burn this index by advancing past it and try again.
let _ = managed_account
.next_change_address(Some(&xpub), true)
.map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?;
};
// Pick a change address no concurrent in-flight send has peeked,
// committing the index advance under this write lock. Recorded
// into `pending_change` via the outpoint reservation below so a
// concurrent caller can never select the same change address.
let change_addr = super::change_address::pick_and_reserve_change_address(
&self.reservations,
managed_account,
&xpub,
)?;

let mut builder = TransactionBuilder::new()
.set_current_height(current_height)
Expand Down Expand Up @@ -760,6 +740,48 @@ mod tests {
)
}

/// CW-002: a standalone change-address hand-out must not return a
/// change address that an in-flight `send_to_addresses` already peeked
/// into `pending_change`. The avoid-loop in `reserve_bip44_address`
/// skips it and hands out the next index instead.
#[tokio::test]
async fn next_change_address_skips_pending_change_reservation() {
let (wm, wallet_id, _recipient, _signer) = build_funded_wallet_manager(2_000_000);
let cw =
make_core_wallet_for_manager(Arc::clone(&wm), wallet_id, Arc::new(FailingBroadcaster));

// Peek (advance=false) the change address the bridge would pick
// first, without committing the index or bridge-reserving it.
let pending = {
let mut guard = wm.write().await;
let (wallet, info) = guard.get_wallet_and_info_mut(&wallet_id).unwrap();
let xpub = wallet
.accounts
.standard_bip44_accounts
.get(&0)
.unwrap()
.account_xpub;
let managed = info
.core_wallet
.accounts
.standard_bip44_accounts
.get_mut(&0)
.unwrap();
managed.next_change_address(Some(&xpub), false).unwrap()
};

// Mark it pending, as the broadcast loop does before a send confirms.
let _guard = cw.reservations.reserve(vec![], Some(pending.clone()));
assert!(cw.reservations.change_address_pending(&pending));

// A standalone hand-out must steer clear of the pending address.
let handed_out = cw.next_change_address_for_account(0).await.unwrap();
assert_ne!(
handed_out, pending,
"change hand-out returned an address already pending from an in-flight send"
);
}

/// Two concurrent `send_to_addresses` calls on one wallet with one UTXO must yield exactly
/// one broadcast. The loser must get [`PlatformWalletError::NoSpendableInputs`] — never
/// `TransactionBroadcast` (that would mean it reached the network, which is the bug closed).
Expand Down
53 changes: 53 additions & 0 deletions packages/rs-platform-wallet/src/wallet/core/change_address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Shared change-address selection for the broadcast and DashPay payment
//! send paths.
//!
//! Both build a transaction under the wallet write lock and must pick a
//! BIP-44 change address that no concurrent in-flight send has already
//! peeked into `pending_change`. The selection rule is identical at both
//! sites, so it lives here once.

use dashcore::Address;
use key_wallet::bip32::ExtendedPubKey;
use key_wallet::managed_account::managed_core_funds_account::ManagedCoreFundsAccount;

use super::reservations::OutpointReservations;
use crate::error::PlatformWalletError;

/// Pick the next change address that is not already pending from a
/// concurrent in-flight send, committing the derivation-index advance
/// under the caller's wallet write lock.
///
/// Peeks `next_change_address(.., advance=false)`; if the peeked address is
/// in the `pending_change` snapshot it advances past the index
/// (`advance=true`) and retries, otherwise it commits the advance and
/// returns the peeked address. Advancing burns at most one index per
/// concurrent in-flight send — a bounded, acceptable privacy cost; on
/// broadcast failure a single index is burned, also acceptable. Index
/// reuse is not.
///
/// The caller must hold the wallet write lock across this call and record
/// the returned address into `reservations.pending_change` (via
/// `reserve(.., Some(addr))`) before releasing the lock, so a concurrent
/// caller cannot select it.
pub(crate) fn pick_and_reserve_change_address(
reservations: &OutpointReservations,
managed_account: &mut ManagedCoreFundsAccount,
xpub: &ExtendedPubKey,
) -> Result<Address, PlatformWalletError> {
let pending_change = reservations.pending_change_snapshot();
loop {
let peeked = managed_account
.next_change_address(Some(xpub), false)
.map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?;
// Commit the advance under the write lock the caller holds. On
// broadcast failure a single index is burned; on success the
// pending-change entry is released when the reservation guard drops
// post-`check_core_transaction`.
let _ = managed_account
.next_change_address(Some(xpub), true)
.map_err(|e| PlatformWalletError::TransactionBuild(e.to_string()))?;
if !pending_change.contains(&peeked) {
return Ok(peeked);
}
}
}
1 change: 1 addition & 0 deletions packages/rs-platform-wallet/src/wallet/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod balance;
pub mod balance_handler;
pub(crate) mod broadcast;
pub(crate) mod change_address;
pub(crate) mod reservations;
pub mod wallet;

Expand Down
25 changes: 25 additions & 0 deletions packages/rs-platform-wallet/src/wallet/core/reservations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
//! that releases keys on drop, with `release_after_commit` and
//! `leak_until_sync` escape hatches mirroring the broadcast-success /
//! unrecoverable-leak distinction.
//!
//! This is one of two reservation models in platform-wallet, with a
//! distinct reclaim discipline: here reservations are released on guard
//! **drop** (RAII), bound to a single in-flight broadcast. The sibling
//! [`address_reserve`](super::super::platform_addresses::address_reserve)
//! is a process-global table reclaimed by a **TTL sweep**, for receive /
//! change address hand-outs that have no single owning scope. A change to
//! one's reclaim model should be weighed against the other.

use std::collections::HashSet;
use std::hash::Hash;
Expand Down Expand Up @@ -162,6 +170,12 @@ impl<K: Eq + Hash + Clone + std::fmt::Debug> ReservationGuard<K> {
std::mem::forget(self);
}

/// Number of keys this guard holds reserved. Lets the leak path log
/// how many reservations were pinned until restart.
pub(crate) fn reserved_count(&self) -> usize {
self.keys.len()
}

fn do_release(&mut self) {
let mut inner = self
.reservations
Expand Down Expand Up @@ -282,6 +296,17 @@ impl OutpointReservationGuard {
g.leak_until_sync();
}
}

/// Total slots this guard holds reserved (outpoints plus an optional
/// change address). Lets the leak path report how many reservations
/// are pinned until restart.
pub(crate) fn reserved_count(&self) -> usize {
self.outpoint_guard.reserved_count()
+ self
.change_guard
.as_ref()
.map_or(0, ReservationGuard::reserved_count)
}
}

#[cfg(test)]
Expand Down
Loading
Loading