Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ lightning-invoice = { path = "../lightning-invoice" }
lightning-liquidity = { path = "../lightning-liquidity" }
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" }
lightning-persister = { path = "../lightning-persister", features = ["tokio"]}
lightning_0_2 = { package = "lightning", version = "0.2.0", features = ["_test_utils"] }
bech32 = "0.11.0"
bitcoin = { version = "0.32.4", features = ["secp-lowmemory"] }
tokio = { version = "~1.35", default-features = false, features = ["rt-multi-thread"] }
Expand Down
232 changes: 193 additions & 39 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@ impl FeeEstimator for FuzzEstimator {
}
}

impl lightning_0_2::chain::chaininterface::FeeEstimator for FuzzEstimator {
fn get_est_sat_per_1000_weight(
&self, conf_target: lightning_0_2::chain::chaininterface::ConfirmationTarget,
) -> u32 {
match conf_target {
lightning_0_2::chain::chaininterface::ConfirmationTarget::MaximumFeeEstimate
| lightning_0_2::chain::chaininterface::ConfirmationTarget::UrgentOnChainSweep => {
MAX_FEE
},
lightning_0_2::chain::chaininterface::ConfirmationTarget::ChannelCloseMinimum
| lightning_0_2::chain::chaininterface::ConfirmationTarget::AnchorChannelFee
| lightning_0_2::chain::chaininterface::ConfirmationTarget::MinAllowedAnchorChannelRemoteFee
| lightning_0_2::chain::chaininterface::ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee
| lightning_0_2::chain::chaininterface::ConfirmationTarget::OutputSpendingFee => 253,
lightning_0_2::chain::chaininterface::ConfirmationTarget::NonAnchorChannelFee => {
let val = self.ret_val.load(atomic::Ordering::Relaxed);
cmp::min(val, MAX_FEE)
},
}
}
}

impl FuzzEstimator {
fn feerate_sat_per_kw(&self) -> FeeRate {
let feerate = self.ret_val.load(atomic::Ordering::Acquire);
Expand Down Expand Up @@ -182,6 +204,14 @@ impl BroadcasterInterface for TestBroadcaster {
}
}

impl lightning_0_2::chain::chaininterface::BroadcasterInterface for TestBroadcaster {
fn broadcast_transactions(&self, txs: &[&bitcoin::Transaction]) {
for tx in txs {
self.txn_broadcasted.borrow_mut().push((*tx).clone());
}
}
}

struct ChainState {
blocks: Vec<(Header, Vec<Transaction>)>,
confirmed_txids: HashSet<Txid>,
Expand Down Expand Up @@ -290,34 +320,49 @@ impl TestChainMonitor {
latest_monitors: Mutex::new(new_hash_map()),
}
}
}
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(channel_id, monitor);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: ser.0,
pending_monitors: Vec::new(),
},
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: Vec::new(),
pending_monitors: vec![(monitor_id, ser.0)],
},
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
fn do_watch_channel_bytes(
&self, channel_id_bytes: [u8; 32], monitor_id: u64, serialized_monitor: Vec<u8>,
) {
let channel_id = ChannelId(channel_id_bytes);
let state = LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: serialized_monitor,
pending_monitors: Vec::new(),
};
if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() {
let mut latest_monitors = self.latest_monitors.lock().unwrap();
if latest_monitors.insert(channel_id, state).is_some() {
panic!("Already had monitor pre-watch_channel");
}
res
}
}
impl lightning::chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self, channel_id: lightning::ln::types::ChannelId,
monitor: channelmonitor::ChannelMonitor<TestChannelSigner>,
) -> Result<chain::ChannelMonitorUpdateStatus, ()> {
let mut ser = VecWriter(Vec::new());
lightning::util::ser::Writeable::write(&(BlockHash::all_zeros(), monitor.clone()), &mut ser,).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(channel_id, monitor);
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: ser.0,
pending_monitors: Vec::new(),
},
Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState {
persisted_monitor_id: monitor_id,
persisted_monitor: ser.0.clone(),
pending_monitors: vec![(monitor_id, ser.0)],
},
Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(),
Err(()) => panic!(),
};
if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() {
panic!("Already had monitor pre-watch_channel");
}
res
}

fn update_channel(
&self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate,
Expand All @@ -330,13 +375,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
.as_ref()
.map(|(_, data)| data)
.unwrap_or(&map_entry.persisted_monitor);
let deserialized_monitor =
<(BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>)>::read(
&mut &latest_monitor_data[..],
(&*self.keys, &*self.keys),
)
.unwrap()
.1;
let deserialized_monitor: (BlockHash, channelmonitor::ChannelMonitor<TestChannelSigner>) =
ReadableArgs::read(&mut &latest_monitor_data[..], (&*self.keys, &*self.keys)).unwrap();
let deserialized_monitor = deserialized_monitor.1;
deserialized_monitor
.update_monitor(
update,
Expand All @@ -346,7 +387,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
)
.unwrap();
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
lightning::util::ser::Writeable::write(
&(BlockHash::all_zeros(), deserialized_monitor), &mut ser,
).unwrap();
let res = self.chain_monitor.update_channel(channel_id, update);
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
Expand All @@ -368,6 +411,40 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
}
}

impl lightning_0_2::chain::Watch<lightning_0_2::util::test_channel_signer::TestChannelSigner> for TestChainMonitor {
fn watch_channel(
&self,
channel_id: lightning_0_2::ln::types::ChannelId,
monitor: lightning_0_2::chain::channelmonitor::ChannelMonitor<lightning_0_2::util::test_channel_signer::TestChannelSigner>
) -> Result<lightning_0_2::chain::ChannelMonitorUpdateStatus, ()> {
let monitor_id = monitor.get_latest_update_id();
let mut ser = Vec::new();
lightning_0_2::util::ser::Writeable::write(
&(BlockHash::all_zeros(), monitor.clone()),
&mut ser,
).unwrap();
self.do_watch_channel_bytes(channel_id.0, monitor_id, ser);
Ok(lightning_0_2::chain::ChannelMonitorUpdateStatus::Completed)
}
fn update_channel(
&self, _channel_id: lightning_0_2::ln::types::ChannelId,
_update: &lightning_0_2::chain::channelmonitor::ChannelMonitorUpdate,
) -> lightning_0_2::chain::ChannelMonitorUpdateStatus {
lightning_0_2::chain::ChannelMonitorUpdateStatus::Completed
}

fn release_pending_monitor_events(
&self,
) -> Vec<(
lightning_0_2::chain::transaction::OutPoint,
lightning_0_2::ln::types::ChannelId,
Vec<lightning_0_2::chain::channelmonitor::MonitorEvent>,
PublicKey,
)> {
Vec::new()
}
}

struct KeyProvider {
node_secret: SecretKey,
rand_bytes_id: atomic::AtomicU32,
Expand All @@ -385,6 +462,33 @@ impl EntropySource for KeyProvider {
}
}

impl lightning_0_2::sign::EntropySource for KeyProvider {
fn get_secure_random_bytes(&self) -> [u8; 32] {
lightning::sign::EntropySource::get_secure_random_bytes(self)
}
}

impl lightning_0_2::sign::SignerProvider for KeyProvider {
type EcdsaSigner = lightning_0_2::util::test_channel_signer::TestChannelSigner;

fn generate_channel_keys_id(&self, _inbound: bool, _user_channel_id: u128) -> [u8; 32] {
let id = self.rand_bytes_id.fetch_add(1, atomic::Ordering::Relaxed) as u8;
[id; 32]
}

fn derive_channel_signer(&self, _channel_keys_id: [u8; 32]) -> Self::EcdsaSigner {
unreachable!()
}

fn get_destination_script(&self, _channel_keys_id: [u8; 32]) -> Result<bitcoin::ScriptBuf, ()> {
unreachable!()
}

fn get_shutdown_scriptpubkey(&self) -> Result<lightning_0_2::ln::script::ShutdownScript, ()> {
unreachable!()
}
}

impl NodeSigner for KeyProvider {
fn get_node_id(&self, recipient: Recipient) -> Result<PublicKey, ()> {
let node_secret = match recipient {
Expand Down Expand Up @@ -916,7 +1020,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
logger.clone(),
$fee_estimator.clone(),
Arc::new(TestPersister {
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
}),
Arc::clone(&keys_manager),
));
Expand Down Expand Up @@ -999,12 +1103,47 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
};
// Use a different value of `use_old_mons` if we have another monitor (only for node B)
// by shifting `use_old_mons` one in base-3.
let cross_version_round_trip = use_old_mons % 3 == 2;
use_old_mons /= 3;
let mon = <(BlockHash, ChannelMonitor<TestChannelSigner>)>::read(
let mut serialized_mon = serialized_mon;
if cross_version_round_trip {
let old_mon_res: Result<
(
BlockHash,
lightning_0_2::chain::channelmonitor::ChannelMonitor<
lightning_0_2::util::test_channel_signer::TestChannelSigner
>
),
_
> = lightning_0_2::util::ser::ReadableArgs::read(
&mut &serialized_mon[..],
(&**keys, &**keys),
)
.expect("Failed to read monitor");
);

if let Ok((block_hash, old_monitor)) = old_mon_res {
let mut out = Vec::new();

if lightning_0_2::util::ser::Writeable::write(
&(block_hash, old_monitor), &mut out
).is_ok() {

let check_res: Result<(BlockHash, ChannelMonitor<TestChannelSigner>), _> =
lightning::util::ser::ReadableArgs::read(
&mut &out[..], (&**keys, &**keys)
);

if check_res.is_ok() {
serialized_mon = out;
}
}
}
}

let mon: (BlockHash, ChannelMonitor<TestChannelSigner>) =
lightning::util::ser::ReadableArgs::read(
&mut &serialized_mon[..],
(&**keys, &**keys),
).expect("Failed to read monitor");
monitors.insert(channel_id, mon.1);
// Update the latest `ChannelMonitor` state to match what we just told LDK.
prev_state.persisted_monitor = serialized_mon;
Expand Down Expand Up @@ -1034,8 +1173,8 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
channel_monitors: monitor_refs,
};

let manager =
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
let manager: (BlockHash, ChanMan) =
lightning::util::ser::ReadableArgs::read(&mut &ser[..], read_args).expect("Failed to read manager");
let res = (manager.1, chain_monitor.clone());
for (channel_id, mon) in monitors.drain() {
assert_eq!(
Expand Down Expand Up @@ -1341,6 +1480,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let chan_a_id = chan_ab_ids[0];
let chan_b_id = chan_bc_ids[0];


*monitor_a.persister.update_ret.lock().unwrap() = *mon_style[0].borrow();
*monitor_b.persister.update_ret.lock().unwrap() = *mon_style[1].borrow();
*monitor_c.persister.update_ret.lock().unwrap() = *mon_style[2].borrow();

let mut p_ctr: u64 = 0;

let mut peers_ab_disconnected = false;
Expand Down Expand Up @@ -2657,7 +2801,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
() => { {
let mut last_pass_no_updates = false;
for i in 0..std::usize::MAX {
if i == 100 {
if i == 10000 {
panic!("It may take may iterations to settle the state, but it should not take forever");
}
// Next, make sure no monitor updates are pending
Expand Down Expand Up @@ -2718,6 +2862,16 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
for node in &nodes {
node.timer_tick_occurred();
}

for id in &chan_ab_ids {
complete_all_monitor_updates(&monitor_a, id);
complete_all_monitor_updates(&monitor_b, id);
}
for id in &chan_bc_ids {
complete_all_monitor_updates(&monitor_b, id);
complete_all_monitor_updates(&monitor_c, id);
}

process_all_events!();

// Verify no payments are stuck - all should have resolved
Expand Down
Loading