diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 4a884ab2a6..f5d4b3723f 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -45,4 +45,4 @@ jobs: echo "ELECTRS_EXE=$( pwd )/bin/electrs-${{ runner.os }}-${{ runner.arch }}" >> "$GITHUB_ENV" - name: Run benchmarks run: | - RUSTFLAGS="--cfg tokio_unstable" cargo bench + RUSTFLAGS="--cfg tokio_unstable" cargo test --benches --features bench diff --git a/Cargo.toml b/Cargo.toml index 9f1c257cb8..83f6596ca0 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ panic = 'abort' # Abort on panic [features] default = [] +bench = [] postgres = ["dep:tokio-postgres", "dep:native-tls", "dep:postgres-native-tls"] [dependencies] @@ -141,6 +142,15 @@ check-cfg = [ name = "payments" harness = false +[[bench]] +name = "operations" +harness = false + +[[bench]] +name = "database" +harness = false +required-features = ["bench"] + #[patch.crates-io] #lightning = { path = "../rust-lightning/lightning" } #lightning-types = { path = "../rust-lightning/lightning-types" } diff --git a/benches/README.md b/benches/README.md new file mode 100644 index 0000000000..c2a5a4e593 --- /dev/null +++ b/benches/README.md @@ -0,0 +1,69 @@ +# Benchmarks + +This directory holds [Criterion](https://docs.rs/criterion) benchmarks for LDK Node. There are +three benchmark targets: + +| Target | What it measures | +|--------|------------------| +| `database` | Low-level `KVStore` operations (single read/write/remove, batched & concurrent writes, paginated listing, payment reload) across the filesystem, SQLite and PostgreSQL backends. | +| `operations` | Higher-level node operations: payment forwarding, channel opens, and node startup time seeded with varying channel/payment history. | +| `payments` | End-to-end payment throughput between two nodes across store backends. | + +The `operations` and `payments` benchmarks spin up real `bitcoind`/`electrs` instances and full +nodes, so they take a while to run. The `database` benchmark only exercises the storage layer and +is comparatively quick. + +## Prerequisites + +- `bitcoind` and `electrs` binaries for the `operations` and `payments` benchmarks. Point the + `BITCOIND_EXE` and `ELECTRS_EXE` environment variables at them (see + `scripts/download_bitcoind_electrs.sh` for a convenient way to fetch them). +- The `bench` feature is required for the `database` benchmark. +- The `postgres` feature plus a reachable PostgreSQL server (set `TEST_POSTGRES_URL`) are required + to include the PostgreSQL backend. Without it, the filesystem and SQLite backends still run. + +## Running + +Run everything for real measurements: + +```sh +RUSTFLAGS="--cfg tokio_unstable" cargo bench --benches --features bench +``` + +`--cfg tokio_unstable` is optional; it enables the tokio eager driver handoff used by the benchmark +runtimes. Without it the benchmarks still run. + +Run a single target: + +```sh +cargo bench --bench database --features bench +cargo bench --bench operations +cargo bench --bench payments +``` + +Filter to specific cases (Criterion takes a substring filter; the `operations` target also uses it +to skip expensive setup for cases that don't match): + +```sh +cargo bench --bench operations -- channel_open +cargo bench --bench database --features bench -- sqlite +``` + +### PostgreSQL backend + +```sh +export TEST_POSTGRES_URL="host=localhost user=postgres password=postgres" +cargo bench --benches --features "bench postgres" +``` + +Benchmark fixtures create their tables in the `ldk_db` database (the default used when the +connection string omits a `dbname`) and drop them on teardown. + +## CI + +The `benchmarks` workflow smoke-runs the benchmarks with `cargo test --benches --features bench` (a +single iteration each) to keep them compiling and working rather than collecting full measurements. +It does not require PostgreSQL; PostgreSQL coverage is opt-in for local runs by enabling the +`postgres` feature and setting `TEST_POSTGRES_URL`. The seeded `startup` scenarios in `operations` +detect CI via the `CI` environment variable and only run the smallest scenario there; the full set +runs locally. diff --git a/benches/common/db_store.rs b/benches/common/db_store.rs new file mode 100644 index 0000000000..357a7c97a4 --- /dev/null +++ b/benches/common/db_store.rs @@ -0,0 +1,656 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Shared fixtures and helpers for the database (`KVStore`) micro-benchmarks. +//! +//! This lives under `benches/` rather than in the library so the benchmark scaffolding does not +//! bloat the crate's public surface. It deliberately relies only on `ldk-node`'s public API. + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use bitcoin::hashes::Hash; +use bitcoin::Txid; +use lightning::impl_writeable_tlv_based; +use lightning::ln::channelmanager::PaymentId; +use lightning::util::persist::{KVStore, PaginatedKVStore}; +use lightning::util::ser::{Readable, Writeable}; +use lightning_persister::fs_store::v2::FilesystemStoreV2; +use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; + +use ldk_node::io::sqlite_store::SqliteStore; +use ldk_node::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; + +#[cfg(feature = "postgres")] +use ldk_node::io::postgres_store::{PostgresStore, DEFAULT_DB_NAME, POSTGRES_TEST_URL_ENV_VAR}; + +// The persistence namespaces the node uses for (pending) payments. Re-declared here so the +// benchmark exercises realistic keys without depending on crate-internal constants. +const PAYMENT_INFO_PRIMARY_NAMESPACE: &str = "payments"; +const PAYMENT_INFO_SECONDARY_NAMESPACE: &str = ""; +const PENDING_PAYMENT_INFO_PRIMARY_NAMESPACE: &str = "pending_payments"; +const PENDING_PAYMENT_INFO_SECONDARY_NAMESPACE: &str = ""; + +pub const BATCH_LEN: u64 = 100; +pub const LARGE_PAYMENT_SET_LEN: u64 = 10_000; +pub const PAGINATED_PAGE_LEN: u64 = 50; + +static NEXT_STORE_ID: AtomicU64 = AtomicU64::new(0); + +#[derive(Clone)] +pub struct PaymentUpdateBatch(Vec); + +#[derive(Clone)] +pub struct PendingPaymentBatch(Vec); + +#[derive(Clone)] +pub struct PendingPaymentUpdateBatch(Vec); + +#[derive(Clone)] +struct PendingPaymentDetails { + details: PaymentDetails, + conflicting_txids: Vec, +} + +impl_writeable_tlv_based!(PendingPaymentDetails, { + (0, details, required), + (2, conflicting_txids, optional_vec), +}); + +#[derive(Clone, Copy)] +pub enum Backend { + Filesystem, + Sqlite, + #[cfg(feature = "postgres")] + Postgres, +} + +impl Backend { + pub fn name(self) -> &'static str { + match self { + Self::Filesystem => "filesystem", + Self::Sqlite => "sqlite", + #[cfg(feature = "postgres")] + Self::Postgres => "postgres", + } + } +} + +/// A concrete store wrapped for cheap cloning into concurrent benchmark tasks. +/// +/// We dispatch over the concrete backends instead of using a trait object so the benchmark only +/// touches `ldk-node`'s public API. +#[derive(Clone)] +enum BenchStore { + Filesystem(Arc), + Sqlite(Arc), + #[cfg(feature = "postgres")] + Postgres(Arc), +} + +impl BenchStore { + async fn write(&self, primary: &str, secondary: &str, key: &str, buf: Vec) { + match self { + Self::Filesystem(store) => { + KVStore::write(store.as_ref(), primary, secondary, key, buf).await.unwrap() + }, + Self::Sqlite(store) => { + KVStore::write(store.as_ref(), primary, secondary, key, buf).await.unwrap() + }, + #[cfg(feature = "postgres")] + Self::Postgres(store) => { + KVStore::write(store.as_ref(), primary, secondary, key, buf).await.unwrap() + }, + } + } + + async fn read(&self, primary: &str, secondary: &str, key: &str) -> Vec { + match self { + Self::Filesystem(store) => { + KVStore::read(store.as_ref(), primary, secondary, key).await.unwrap() + }, + Self::Sqlite(store) => { + KVStore::read(store.as_ref(), primary, secondary, key).await.unwrap() + }, + #[cfg(feature = "postgres")] + Self::Postgres(store) => KVStore::read(store.as_ref(), primary, secondary, key).await.unwrap(), + } + } + + async fn remove(&self, primary: &str, secondary: &str, key: &str) { + match self { + Self::Filesystem(store) => { + KVStore::remove(store.as_ref(), primary, secondary, key, false).await.unwrap() + }, + Self::Sqlite(store) => { + KVStore::remove(store.as_ref(), primary, secondary, key, false).await.unwrap() + }, + #[cfg(feature = "postgres")] + Self::Postgres(store) => { + KVStore::remove(store.as_ref(), primary, secondary, key, false).await.unwrap() + }, + } + } + + async fn list(&self, primary: &str, secondary: &str) -> Vec { + match self { + Self::Filesystem(store) => { + KVStore::list(store.as_ref(), primary, secondary).await.unwrap() + }, + Self::Sqlite(store) => KVStore::list(store.as_ref(), primary, secondary).await.unwrap(), + #[cfg(feature = "postgres")] + Self::Postgres(store) => KVStore::list(store.as_ref(), primary, secondary).await.unwrap(), + } + } +} + +pub struct StoreFixture { + store: BenchStore, + storage_path: PathBuf, + #[cfg(feature = "postgres")] + postgres_cleanup: Option, +} + +impl StoreFixture { + pub async fn new(backend: Backend, benchmark_name: &str) -> Self { + let store_id = next_store_id(); + let storage_path = bench_storage_path(backend, benchmark_name, &store_id); + #[cfg(feature = "postgres")] + let mut postgres_cleanup = None; + + let store = match backend { + Backend::Filesystem => BenchStore::Filesystem(Arc::new( + FilesystemStoreV2::new(storage_path.clone()).unwrap(), + )), + Backend::Sqlite => BenchStore::Sqlite(Arc::new( + SqliteStore::new(storage_path.clone(), None, None).unwrap(), + )), + #[cfg(feature = "postgres")] + Backend::Postgres => { + let connection_string = postgres_connection_string(); + let table_name = postgres_table_name(&store_id); + let store = PostgresStore::new( + connection_string.clone(), + None, + Some(table_name.clone()), + None, + ) + .await + .unwrap(); + postgres_cleanup = Some(PostgresCleanup { connection_string, table_name }); + BenchStore::Postgres(Arc::new(store)) + }, + }; + + Self { + store, + storage_path, + #[cfg(feature = "postgres")] + postgres_cleanup, + } + } + + pub async fn write_payment_batch(&self, payments: Vec) { + for payment in payments { + self.write_payment_details(payment).await; + } + } + + pub async fn write_payment_batch_from_offset(&self, offset: u64) { + self.write_payment_batch(payment_details_batch(offset)).await; + } + + pub async fn write_payment(&self, idx: u64) { + self.write_payment_details(payment_details(idx)).await; + } + + pub async fn write_payment_update(&self, idx: u64) { + self.write_payment_details(payment_update(idx)).await; + } + + pub async fn write_payment_update_batch(&self, updates: PaymentUpdateBatch) { + for payment in updates.0 { + self.write_payment_details(payment).await; + } + } + + pub async fn read_payment(&self, idx: u64) -> Vec { + self.store + .read( + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + &payment_key(idx), + ) + .await + } + + pub async fn remove_payment_key(&self, key: &str) { + self.store + .remove(PAYMENT_INFO_PRIMARY_NAMESPACE, PAYMENT_INFO_SECONDARY_NAMESPACE, key) + .await; + } + + pub async fn reload_payments(&self) -> Vec { + let keys = + self.store.list(PAYMENT_INFO_PRIMARY_NAMESPACE, PAYMENT_INFO_SECONDARY_NAMESPACE).await; + let mut payments = Vec::with_capacity(keys.len()); + for key in keys { + let bytes = self + .store + .read(PAYMENT_INFO_PRIMARY_NAMESPACE, PAYMENT_INFO_SECONDARY_NAMESPACE, &key) + .await; + payments.push(PaymentDetails::read(&mut &bytes[..]).unwrap()); + } + payments + } + + pub async fn write_payment_batch_concurrent(&self, offset: u64, same_key: bool) { + let mut handles = Vec::with_capacity(BATCH_LEN as usize); + for idx in offset..offset + BATCH_LEN { + let store = self.store.clone(); + let payment = payment_details(idx); + let key = if same_key { + "shared_payment_key".to_string() + } else { + payment_key_hex(&payment.id) + }; + handles.push(tokio::spawn(async move { + store + .write( + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .await; + })); + } + for handle in handles { + handle.await.unwrap(); + } + } + + pub async fn write_pending_payment_batch(&self, payments: PendingPaymentBatch) { + for payment in payments.0 { + self.write_pending_payment_details(payment).await; + } + } + + pub async fn write_pending_payment_update_batch(&self, updates: PendingPaymentUpdateBatch) { + for payment in updates.0 { + self.write_pending_payment_details(payment).await; + } + } + + pub async fn insert_update_read_payment(&self, idx: u64) -> Vec { + self.write_payment_details(payment_details(idx)).await; + self.write_payment_details(payment_update(idx)).await; + self.read_payment(idx).await + } + + async fn write_payment_details(&self, payment: PaymentDetails) { + self.store + .write( + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + &payment_key_hex(&payment.id), + payment.encode(), + ) + .await; + } + + async fn write_pending_payment_details(&self, payment: PendingPaymentDetails) { + self.store + .write( + PENDING_PAYMENT_INFO_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_SECONDARY_NAMESPACE, + &payment_key_hex(&payment.details.id), + payment.encode(), + ) + .await; + } +} + +impl Drop for StoreFixture { + fn drop(&mut self) { + #[cfg(feature = "postgres")] + if let Some(cleanup) = self.postgres_cleanup.take() { + tokio::spawn(async move { + cleanup.drop_table().await; + }); + return; + } + + let _ = std::fs::remove_dir_all(&self.storage_path); + } +} + +pub struct PaginatedStoreFixture { + inner: PaginatedStoreFixtureInner, +} + +enum PaginatedStoreFixtureInner { + Filesystem { + store: FilesystemStoreV2, + storage_path: PathBuf, + }, + Sqlite { + store: SqliteStore, + storage_path: PathBuf, + }, + #[cfg(feature = "postgres")] + Postgres { + store: PostgresStore, + runtime: Arc, + cleanup: PostgresCleanup, + }, +} + +impl PaginatedStoreFixture { + pub async fn new( + backend: Backend, benchmark_name: &str, _runtime: Arc, + ) -> Option { + let store_id = next_store_id(); + let storage_path = bench_storage_path(backend, benchmark_name, &store_id); + + match backend { + Backend::Filesystem => { + let store = FilesystemStoreV2::new(storage_path.clone()).unwrap(); + populate_async_store(&store).await; + Some(Self { inner: PaginatedStoreFixtureInner::Filesystem { store, storage_path } }) + }, + Backend::Sqlite => { + let store = SqliteStore::new(storage_path.clone(), None, None).unwrap(); + populate_async_store(&store).await; + Some(Self { inner: PaginatedStoreFixtureInner::Sqlite { store, storage_path } }) + }, + #[cfg(feature = "postgres")] + Backend::Postgres => { + let connection_string = postgres_connection_string(); + let table_name = postgres_table_name(&store_id); + let store = PostgresStore::new( + connection_string.clone(), + None, + Some(table_name.clone()), + None, + ) + .await + .unwrap(); + populate_async_store(&store).await; + let cleanup = PostgresCleanup { connection_string, table_name }; + Some(Self { + inner: PaginatedStoreFixtureInner::Postgres { + store, + runtime: _runtime, + cleanup, + }, + }) + }, + } + } + + pub async fn list_first_page(&self) -> usize { + match &self.inner { + PaginatedStoreFixtureInner::Filesystem { store, .. } => list_first_page(store).await, + PaginatedStoreFixtureInner::Sqlite { store, .. } => list_first_page(store).await, + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { store, .. } => list_first_page(store).await, + } + } + + pub async fn list_second_page(&self) -> usize { + match &self.inner { + PaginatedStoreFixtureInner::Filesystem { store, .. } => list_second_page(store).await, + PaginatedStoreFixtureInner::Sqlite { store, .. } => list_second_page(store).await, + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { store, .. } => list_second_page(store).await, + } + } +} + +impl Drop for PaginatedStoreFixture { + fn drop(&mut self) { + match &mut self.inner { + PaginatedStoreFixtureInner::Filesystem { storage_path, .. } + | PaginatedStoreFixtureInner::Sqlite { storage_path, .. } => { + let _ = std::fs::remove_dir_all(storage_path); + }, + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { runtime, cleanup, .. } => { + let cleanup = cleanup.clone(); + runtime.spawn(async move { + cleanup.drop_table().await; + }); + }, + } + } +} + +async fn list_first_page(store: &S) -> usize { + PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap() + .keys + .len() +} + +async fn list_second_page(store: &S) -> usize { + let first_page = PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap(); + let page_token = first_page.next_page_token.expect("first page should have a next token"); + PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + Some(page_token), + ) + .await + .unwrap() + .keys + .len() +} + +pub fn configured_backends() -> Vec { + #[cfg(feature = "postgres")] + { + let mut backends = vec![Backend::Filesystem, Backend::Sqlite]; + if std::env::var(POSTGRES_TEST_URL_ENV_VAR).is_ok() { + backends.push(Backend::Postgres); + } + backends + } + + #[cfg(not(feature = "postgres"))] + { + vec![Backend::Filesystem, Backend::Sqlite] + } +} + +async fn populate_async_store(store: &S) { + for idx in 0..LARGE_PAYMENT_SET_LEN { + let payment = payment_details(idx); + KVStore::write( + store, + PAYMENT_INFO_PRIMARY_NAMESPACE, + PAYMENT_INFO_SECONDARY_NAMESPACE, + &payment_key_hex(&payment.id), + payment.encode(), + ) + .await + .unwrap(); + } +} + +fn next_store_id() -> String { + let store_id = NEXT_STORE_ID.fetch_add(1, Ordering::Relaxed); + format!("{}_{}", std::process::id(), store_id) +} + +fn bench_storage_path(backend: Backend, benchmark_name: &str, store_id: &str) -> PathBuf { + let mut path = std::env::temp_dir(); + path.push("ldk-node-db-benches"); + path.push(backend.name()); + path.push(benchmark_name); + path.push(store_id); + path +} + +#[cfg(feature = "postgres")] +fn postgres_connection_string() -> String { + std::env::var(POSTGRES_TEST_URL_ENV_VAR).unwrap_or_else(|_| { + panic!("{POSTGRES_TEST_URL_ENV_VAR} must be set to run postgres database benchmarks") + }) +} + +#[cfg(feature = "postgres")] +#[derive(Clone)] +struct PostgresCleanup { + connection_string: String, + table_name: String, +} + +#[cfg(feature = "postgres")] +impl PostgresCleanup { + async fn drop_table(&self) { + // The fixtures create their tables via `PostgresStore::new(.., None, ..)`, which resolves the + // database name to `DEFAULT_DB_NAME` when the connection string omits a `dbname`. Mirror that + // resolution here so cleanup connects to the same database the tables actually live in. + let Ok(mut config) = self.connection_string.parse::() else { + return; + }; + if config.get_dbname().is_none() { + config.dbname(DEFAULT_DB_NAME); + } + let Ok((client, connection)) = config.connect(tokio_postgres::NoTls).await else { + return; + }; + tokio::spawn(async move { + let _ = connection.await; + }); + + let table_name = postgres_identifier(&self.table_name); + let _ = client.execute(&format!("DROP TABLE IF EXISTS {table_name}"), &[]).await; + } +} + +#[cfg(feature = "postgres")] +fn postgres_table_name(store_id: &str) -> String { + format!("ldk_node_bench_{store_id}") +} + +#[cfg(feature = "postgres")] +fn postgres_identifier(identifier: &str) -> String { + format!("\"{}\"", identifier.replace('"', "\"\"")) +} + +pub fn payment_details_batch(offset: u64) -> Vec { + (offset..offset + BATCH_LEN).map(payment_details).collect() +} + +pub fn payment_update_batch_from_offset(offset: u64) -> PaymentUpdateBatch { + PaymentUpdateBatch((offset..offset + BATCH_LEN).map(payment_update).collect()) +} + +pub fn pending_payment_details_batch_from_offset(offset: u64) -> PendingPaymentBatch { + PendingPaymentBatch( + (offset..offset + BATCH_LEN) + .map(|idx| { + let txid = Txid::from_byte_array(filled_bytes(idx + 1)); + PendingPaymentDetails { + details: payment_details(idx), + conflicting_txids: vec![txid], + } + }) + .collect(), + ) +} + +pub fn pending_payment_update_batch_from_offset(offset: u64) -> PendingPaymentUpdateBatch { + PendingPaymentUpdateBatch( + (offset..offset + BATCH_LEN) + .map(|idx| { + let txid = Txid::from_byte_array(filled_bytes(idx + 3)); + PendingPaymentDetails { + details: payment_update(idx), + conflicting_txids: vec![txid], + } + }) + .collect(), + ) +} + +fn payment_details(idx: u64) -> PaymentDetails { + PaymentDetails { + id: payment_id(idx), + kind: PaymentKind::Bolt11 { + hash: PaymentHash(filled_bytes(idx + 2)), + preimage: Some(PaymentPreimage(filled_bytes(idx + 1))), + secret: Some(PaymentSecret(filled_bytes(idx + 3))), + counterparty_skimmed_fee_msat: None, + }, + amount_msat: Some(10_000 + idx), + fee_paid_msat: None, + direction: PaymentDirection::Outbound, + status: PaymentStatus::Pending, + latest_update_timestamp: 0, + } +} + +/// The same payment as [`payment_details`], updated to a succeeded state. Writing this over an +/// existing entry models the common "update the payment we just persisted" workload. +fn payment_update(idx: u64) -> PaymentDetails { + let id = payment_id(idx); + PaymentDetails { + id, + kind: PaymentKind::Bolt11 { + hash: PaymentHash(id.0), + preimage: Some(PaymentPreimage(filled_bytes(idx + 2))), + secret: Some(PaymentSecret(id.0)), + counterparty_skimmed_fee_msat: None, + }, + amount_msat: Some(10_000), + fee_paid_msat: Some(42), + direction: PaymentDirection::Outbound, + status: PaymentStatus::Succeeded, + latest_update_timestamp: 0, + } +} + +fn payment_id(idx: u64) -> PaymentId { + PaymentId(filled_bytes(idx)) +} + +pub fn payment_key(idx: u64) -> String { + payment_key_hex(&payment_id(idx)) +} + +fn payment_key_hex(id: &PaymentId) -> String { + let mut s = String::with_capacity(id.0.len() * 2); + for byte in id.0.iter() { + s.push_str(&format!("{byte:02x}")); + } + s +} + +fn filled_bytes(idx: u64) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&idx.to_be_bytes()); + bytes[8..16].copy_from_slice(&(idx.wrapping_mul(0x9e37_79b9)).to_be_bytes()); + bytes +} diff --git a/benches/database.rs b/benches/database.rs new file mode 100644 index 0000000000..20ae622dfe --- /dev/null +++ b/benches/database.rs @@ -0,0 +1,399 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +#[path = "common/db_store.rs"] +mod db_store; + +use db_store::{ + configured_backends, payment_details_batch, payment_key, payment_update_batch_from_offset, + pending_payment_details_batch_from_offset, pending_payment_update_batch_from_offset, Backend, + PaginatedStoreFixture, StoreFixture, BATCH_LEN, PAGINATED_PAGE_LEN, +}; + +fn database_benchmark(c: &mut Criterion) { + let backends = configured_backends(); + + benchmark_payment_store_single_ops(c, &backends); + benchmark_payment_store_warm_sequential(c, &backends); + benchmark_payment_store_concurrent(c, &backends); + benchmark_payment_store(c, &backends); + benchmark_payment_store_paginated(c, &backends); + benchmark_payment_store_lifecycle(c, &backends); + benchmark_pending_payment_store(c, &backends); +} + +fn benchmark_payment_store(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let insert_payments = payment_details_batch(0); + group.bench_with_input( + BenchmarkId::new("insert_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let insert_payments = insert_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let insert_payments = insert_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_insert").await; + let start = Instant::now(); + fixture.write_payment_batch(insert_payments.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let update_payments = payment_details_batch(0); + let updates = payment_update_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("update_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let update_payments = update_payments.clone(); + let updates = updates.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let update_payments = update_payments.clone(); + let updates = updates.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_update").await; + fixture.write_payment_batch(update_payments.clone()).await; + let start = Instant::now(); + fixture.write_payment_update_batch(updates.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let reload_payments = payment_details_batch(0); + group.bench_with_input( + BenchmarkId::new("reload_100_cold", backend.name()), + &backend, + |b, backend| { + let reload_payments = reload_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let reload_payments = reload_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_reload").await; + fixture.write_payment_batch(reload_payments.clone()).await; + let start = Instant::now(); + let payments = fixture.reload_payments().await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payments); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_paginated(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_paginated"); + group.throughput(Throughput::Elements(PAGINATED_PAGE_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let Some(fixture) = runtime.block_on(PaginatedStoreFixture::new( + backend, + "payment_list_page", + Arc::clone(&runtime), + )) else { + continue; + }; + let fixture = Arc::new(fixture); + + let runner = Arc::clone(&runtime); + let first_page_fixture = Arc::clone(&fixture); + group.bench_function(BenchmarkId::new("list_page_from_10k", backend.name()), |b| { + b.to_async(runner.as_ref()).iter_custom(|iters| { + let first_page_fixture = Arc::clone(&first_page_fixture); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let page_len = first_page_fixture.list_first_page().await; + elapsed += start.elapsed(); + debug_assert_eq!(page_len, PAGINATED_PAGE_LEN as usize); + // Keep the page result observable when debug assertions are compiled out. + std::hint::black_box(page_len); + } + elapsed + } + }) + }); + let runner = Arc::clone(&runtime); + let second_page_fixture = Arc::clone(&fixture); + group.bench_function(BenchmarkId::new("list_second_page_from_10k", backend.name()), |b| { + b.to_async(runner.as_ref()).iter_custom(|iters| { + let second_page_fixture = Arc::clone(&second_page_fixture); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let page_len = second_page_fixture.list_second_page().await; + elapsed += start.elapsed(); + debug_assert_eq!(page_len, PAGINATED_PAGE_LEN as usize); + // Keep the page result observable when debug assertions are compiled out. + std::hint::black_box(page_len); + } + elapsed + } + }) + }); + } + group.finish(); +} + +fn benchmark_pending_payment_store(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/pending_payment_store"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let insert_payments = pending_payment_details_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("insert_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let insert_payments = insert_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let insert_payments = insert_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = + StoreFixture::new(backend, "pending_payment_insert").await; + let start = Instant::now(); + fixture.write_pending_payment_batch(insert_payments.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let update_payments = pending_payment_details_batch_from_offset(0); + let updates = pending_payment_update_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("update_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let update_payments = update_payments.clone(); + let updates = updates.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let update_payments = update_payments.clone(); + let updates = updates.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = + StoreFixture::new(backend, "pending_payment_update").await; + fixture.write_pending_payment_batch(update_payments.clone()).await; + let start = Instant::now(); + fixture.write_pending_payment_update_batch(updates.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_single_ops(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_single"); + group.throughput(Throughput::Elements(1)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("write_new_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_write_new_key").await; + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let start = Instant::now(); + fixture.write_payment(idx).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("write_existing_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_update_existing_key").await; + fixture.write_payment(0).await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + fixture.write_payment_update(0).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("read_existing_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_read_existing_key").await; + fixture.write_payment(0).await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let payment = fixture.read_payment(0).await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payment); + } + elapsed + }) + }); + + group.bench_with_input( + BenchmarkId::new("remove_existing_key", backend.name()), + &backend, + |b, backend| { + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + async move { + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let fixture = + StoreFixture::new(backend, "single_remove_existing_key").await; + fixture.write_payment(idx).await; + let key = payment_key(idx); + let start = Instant::now(); + fixture.remove_payment_key(&key).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_warm_sequential(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_warm"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_100_sequential", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_sequential_warm").await; + let mut elapsed = Duration::ZERO; + for iter in 0..iters { + let offset = iter * BATCH_LEN; + let start = Instant::now(); + fixture.write_payment_batch_from_offset(offset).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_payment_store_concurrent(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_concurrent"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_100_distinct_keys", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_concurrent_distinct").await; + let mut elapsed = Duration::ZERO; + for iter in 0..iters { + let offset = iter * BATCH_LEN; + let start = Instant::now(); + fixture.write_payment_batch_concurrent(offset, false).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("insert_100_same_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_concurrent_same_key").await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + fixture.write_payment_batch_concurrent(0, true).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_payment_store_lifecycle(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_lifecycle"); + group.throughput(Throughput::Elements(1)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_update_read", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "payment_lifecycle").await; + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let start = Instant::now(); + let payment = fixture.insert_update_read_payment(idx).await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payment); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_runtime() -> Arc { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(2).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + Arc::new(builder.build().unwrap()) +} + +criterion_group!(benches, database_benchmark); +criterion_main!(benches); diff --git a/benches/operations.rs b/benches/operations.rs new file mode 100644 index 0000000000..592b2c65a5 --- /dev/null +++ b/benches/operations.rs @@ -0,0 +1,624 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +#[path = "../tests/common/mod.rs"] +mod common; + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use bitcoin::Amount; +use common::{ + expect_channel_pending_event, expect_channel_ready_event, expect_event, + generate_blocks_and_wait, open_channel_to_pending, premine_and_distribute_funds, random_config, + random_storage_path, setup_bitcoind_and_electrsd, setup_node, setup_two_nodes_with_store, + store_bench_configs, wait_for_channel_ready_events, wait_for_payment_success, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD}; +use ldk_node::io::sqlite_store::{SqliteStore, KV_TABLE_NAME, SQLITE_DB_FILE_NAME}; +use ldk_node::{Event, Node}; +use lightning::ln::channelmanager::PaymentId; +use lightning::util::persist::migrate_kv_store_data_async; +use lightning_invoice::{Bolt11InvoiceDescription, Description}; +use lightning_persister::fs_store::v2::FilesystemStoreV2; + +use crate::common::{ + open_channel_push_amt, StoreBenchConfig, TestChainSource, TestConfig, TestStoreType, +}; + +#[cfg(feature = "postgres")] +use ldk_node::io::postgres_store::{PostgresStore, POSTGRES_TEST_URL_ENV_VAR}; + +const STARTUP_SEED_SCENARIOS: [StartupSeedScenario; 6] = [ + StartupSeedScenario { channel_count: 1, payment_count: 2 }, + StartupSeedScenario { channel_count: 1, payment_count: 100 }, + StartupSeedScenario { channel_count: 1, payment_count: 1_000 }, + StartupSeedScenario { channel_count: 10, payment_count: 2 }, + StartupSeedScenario { channel_count: 100, payment_count: 2 }, + StartupSeedScenario { channel_count: 100, payment_count: 1_000 }, +]; +const STARTUP_SEED_PAYMENT_AMOUNT_MSAT: u64 = 1_000_000; +const STARTUP_SEED_MIN_CHANNEL_FUNDING_SAT: u64 = 100_000; +const STARTUP_SEED_CHANNEL_BUFFER_SAT: u64 = 1_000_000; +const STARTUP_SEED_CHANNEL_BATCH_SIZE: u64 = 2; + +#[derive(Clone, Copy)] +struct StartupSeedScenario { + channel_count: u64, + payment_count: u64, +} + +impl StartupSeedScenario { + fn bench_name(self, store_name: &str) -> String { + format!("{}/channels_{}_payments_{}", store_name, self.channel_count, self.payment_count) + } + + fn runs_in_ci(self) -> bool { + self.channel_count == 1 && self.payment_count == 2 + } + + fn channel_funding_sat(self) -> u64 { + let payment_amount_sat = STARTUP_SEED_PAYMENT_AMOUNT_MSAT / 1_000; + let payment_funding_sat = + self.payment_count * payment_amount_sat + STARTUP_SEED_CHANNEL_BUFFER_SAT; + payment_funding_sat.max(STARTUP_SEED_MIN_CHANNEL_FUNDING_SAT) + } + + fn premine_amount_sat(self) -> u64 { + self.channel_count * self.channel_funding_sat() + STARTUP_SEED_CHANNEL_BUFFER_SAT + } +} + +fn operations_benchmark(c: &mut Criterion) { + forwarding_benchmark(c); + channel_open_benchmark(c); + startup_benchmark(c); +} + +fn forwarding_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("forwarding"); + group.sample_size(10); + + for store_config in store_bench_configs() { + if !should_register_bench("forwarding", store_config.name) { + continue; + } + let nodes = setup_forwarding_nodes( + &chain_source, + &bitcoind, + &electrsd, + store_config.store_type, + &runtime, + ); + let nodes = Arc::new(nodes); + + group.bench_function(store_config.name, |b| { + b.to_async(&runtime).iter_custom(|iter| { + let nodes = Arc::clone(&nodes); + + async move { + let mut total = Duration::ZERO; + for _ in 0..iter { + total += send_forwarded_payments(Arc::clone(&nodes)).await; + } + total + } + }); + }); + } +} + +fn benchmark_runtime() -> tokio::runtime::Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(4).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + builder.build().unwrap() +} + +fn channel_open_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("channel_open"); + group.sample_size(10); + + for store_config in store_bench_configs() { + if !should_register_bench("channel_open", store_config.name) { + continue; + } + let (node_a, node_b) = + setup_two_nodes_with_store(&chain_source, false, true, false, store_config.store_type); + let node_a = Arc::new(node_a); + let node_b = Arc::new(node_b); + + // connect nodes + node_a + .connect( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + true, + ) + .unwrap(); + + runtime.block_on(async { + let address_a = node_a.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(35_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + }); + + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + let bitcoind_client = &bitcoind.client; + let electrsd_ref = &electrsd; + + group.bench_function(store_config.name, |b| { + b.to_async(&runtime).iter_custom(|iter| { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + + async move { + let mut total = Duration::ZERO; + for _ in 0..iter { + total += open_channel( + Arc::clone(&node_a), + Arc::clone(&node_b), + bitcoind_client, + electrsd_ref, + ) + .await; + } + total + } + }); + }); + } +} + +fn startup_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("startup"); + group.sample_size(10); + + for startup_seed_scenario in STARTUP_SEED_SCENARIOS { + // Larger seeded startup scenarios are useful locally, but take too long to run in CI. + if is_ci() && !startup_seed_scenario.runs_in_ci() { + continue; + } + + let matching_store_configs: Vec<_> = store_bench_configs() + .into_iter() + .filter(|store_config| { + let bench_name = startup_seed_scenario.bench_name(store_config.name); + should_register_bench("startup", &bench_name) + }) + .collect(); + if matching_store_configs.is_empty() { + continue; + } + + // Seed a canonical sqlite node once, then copy its store into each backend under test. This + // keeps the channel/payment history identical across stores while avoiding repeated expensive + // channel and payment setup for every store backend. + let seeded_config = setup_startup_seed_node( + &chain_source, + &bitcoind, + &electrsd, + startup_seed_scenario, + &runtime, + ); + let startup_configs = migrate_startup_seed_configs( + &seeded_config, + startup_seed_scenario, + matching_store_configs, + &runtime, + ); + + for (bench_name, config) in startup_configs { + group.bench_function(bench_name, |b| { + b.iter_custom(|iter| { + let mut total = Duration::ZERO; + for _ in 0..iter { + let start = Instant::now(); + let node = setup_node(&chain_source, config.clone()); + total += start.elapsed(); + node.stop().unwrap(); + } + total + }); + }); + } + } +} + +/// Builds a canonical sqlite node store with the requested channel and payment history. +/// +/// Startup benchmarks use this store as the source fixture for every backend so differences in +/// measured startup time come from loading equivalent persisted state, not from different setup +/// runs. +fn setup_startup_seed_node( + chain_source: &TestChainSource, bitcoind: &BitcoinD, electrsd: &electrsd::ElectrsD, + seed_scenario: StartupSeedScenario, runtime: &tokio::runtime::Runtime, +) -> TestConfig { + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let node_a = Arc::new(setup_node(chain_source, config_a.clone())); + + let mut config_b = random_config(true); + config_b.store_type = TestStoreType::Sqlite; + let node_b = Arc::new(setup_node(chain_source, config_b)); + + runtime.block_on(async { + let address_a = node_a.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(seed_scenario.premine_amount_sat()), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let funding_amount_sat = seed_scenario.channel_funding_sat(); + let mut remaining_channel_count = seed_scenario.channel_count; + while remaining_channel_count > 0 { + let channel_batch_size = remaining_channel_count.min(STARTUP_SEED_CHANNEL_BATCH_SIZE); + for _ in 0..channel_batch_size { + node_a + .open_channel( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + funding_amount_sat, + None, + None, + ) + .unwrap(); + assert!(node_a.list_peers().iter().any(|peer| peer.node_id == node_b.node_id())); + + let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); + let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); + assert_eq!(funding_txo_a, funding_txo_b); + node_a.sync_wallets().unwrap(); + } + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + for _ in 0..channel_batch_size { + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + wait_for_channel_ready_events(&node_a, node_b.node_id(), 1).await; + wait_for_channel_ready_events(&node_b, node_a.node_id(), 1).await; + } + remaining_channel_count -= channel_batch_size; + } + + for idx in 0..seed_scenario.payment_count { + let invoice_description = Bolt11InvoiceDescription::Direct( + Description::new(format!("startup seed {}", idx + 1)).unwrap(), + ); + let invoice = node_b + .bolt11_payment() + .receive(STARTUP_SEED_PAYMENT_AMOUNT_MSAT, &invoice_description.into(), 9217) + .unwrap(); + let payment_id = node_a.bolt11_payment().send(&invoice, None).unwrap(); + wait_for_payment_success(&node_a, payment_id).await; + } + + drain_events(&node_a); + drain_events(&node_b); + }); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + + config_a +} + +/// Produces benchmark configs backed by copies of the canonical seeded store. +/// +/// Sqlite can reuse the source store directly. Other store backends get a fresh storage path and a +/// migrated copy of the same key-value data. +fn migrate_startup_seed_configs( + source_config: &TestConfig, seed_scenario: StartupSeedScenario, + store_configs: Vec, runtime: &tokio::runtime::Runtime, +) -> Vec<(String, TestConfig)> { + // Open the seeded source store with the same db file and table the node itself uses, otherwise + // we'd read from an empty default-named store and migrate nothing into the other backends. + let source_store = SqliteStore::new( + source_config.node_config.storage_dir_path.clone().into(), + Some(SQLITE_DB_FILE_NAME.to_string()), + Some(KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + store_configs + .into_iter() + .map(|store_config| { + let mut config = source_config.clone(); + config.store_type = store_config.store_type; + if !matches!(store_config.store_type, TestStoreType::Sqlite) { + config.node_config.storage_dir_path = + random_storage_path().to_str().unwrap().to_owned(); + migrate_startup_seed_store(&source_store, &config, runtime); + } + + (seed_scenario.bench_name(store_config.name), config) + }) + .collect() +} + +fn migrate_startup_seed_store( + source_store: &SqliteStore, destination_config: &TestConfig, runtime: &tokio::runtime::Runtime, +) { + runtime.block_on(async { + match destination_config.store_type { + TestStoreType::Sqlite => {}, + TestStoreType::FilesystemStore => { + let destination_store = FilesystemStoreV2::new( + destination_config.node_config.storage_dir_path.clone().into(), + ) + .unwrap(); + migrate_kv_store_data_async(source_store, &destination_store).await.unwrap(); + }, + #[cfg(feature = "postgres")] + TestStoreType::Postgres => { + let connection_string = postgres_connection_string(); + let table_name = postgres_table_name(destination_config); + let destination_store = + PostgresStore::new(connection_string, None, Some(table_name), None) + .await + .unwrap(); + migrate_kv_store_data_async(source_store, &destination_store).await.unwrap(); + }, + TestStoreType::TestSyncStore => { + unreachable!("startup benches do not use TestSyncStore") + }, + } + }); +} + +#[cfg(feature = "postgres")] +fn postgres_connection_string() -> String { + std::env::var(POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()) +} + +#[cfg(feature = "postgres")] +fn postgres_table_name(config: &TestConfig) -> String { + format!( + "test_{}", + config + .node_config + .storage_dir_path + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .collect::() + ) +} + +/// Returns whether the benchmark identified by `group/name` matches the CLI filters. +/// +/// Criterion applies its own filters after benchmark registration, but these benches do expensive +/// setup before registration. Pre-filtering here avoids setting up benchmark cases that cannot run. +/// Only non-flag arguments are considered filters, matching either the full target substring or the +/// group name. +fn should_register_bench(group: &str, name: &str) -> bool { + let target = format!("{}/{}", group, name); + let filters: Vec = + std::env::args().skip(1).filter(|arg| !arg.starts_with('-')).collect(); + filters.is_empty() + || filters.iter().any(|filter| { + target.contains(filter) || (filter == group && target.starts_with(&format!("{group}/"))) + }) +} + +fn is_ci() -> bool { + std::env::var("CI").is_ok_and(|value| !value.is_empty() && value != "0" && value != "false") +} + +fn setup_forwarding_nodes( + chain_source: &TestChainSource, bitcoind: &BitcoinD, electrsd: &electrsd::ElectrsD, + store_type: TestStoreType, runtime: &tokio::runtime::Runtime, +) -> Vec> { + let mut nodes = Vec::new(); + for _ in 0..3 { + let mut config = random_config(true); + config.store_type = store_type; + nodes.push(Arc::new(setup_node(chain_source, config))); + } + + runtime.block_on(async { + let addresses = + nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + addresses, + Amount::from_sat(5_000_000), + ) + .await; + for node in &nodes { + node.sync_wallets().unwrap(); + } + + let funding_amount_sat = 1_000_000; + let push_amount_msat = None; + open_channel_push_amt( + &nodes[0], + &nodes[1], + funding_amount_sat, + push_amount_msat, + true, + electrsd, + ) + .await; + open_channel_push_amt( + &nodes[1], + &nodes[2], + funding_amount_sat, + push_amount_msat, + true, + electrsd, + ) + .await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + for node in &nodes { + node.sync_wallets().unwrap(); + } + + expect_event!(nodes[0], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[2], ChannelReady); + + tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_forwarding_path(&nodes).await; + }); + + nodes +} + +async fn send_forwarded_payments(nodes: Arc>>) -> Duration { + let total_payments = 25; + let amount_msat = 5_000; + + let mut total = Duration::ZERO; + + for _ in 0..total_payments { + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("forwarding".to_string()).unwrap()); + let invoice = nodes[2] + .bolt11_payment() + .receive(amount_msat, &invoice_description.into(), 9217) + .unwrap(); + + let start = Instant::now(); + let payment_id = nodes[0].bolt11_payment().send(&invoice, None).unwrap(); + total += wait_for_forwarded_payment(&nodes, payment_id, start).await; + } + + // return funds and clean up for next run + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("return".to_string()).unwrap()); + let invoice = nodes[0] + .bolt11_payment() + .receive(amount_msat * total_payments, &invoice_description.into(), 9217) + .unwrap(); + if let Ok(return_payment_id) = nodes[2].bolt11_payment().send(&invoice, None) { + wait_for_payment_success(&nodes[2], return_payment_id).await + } + tokio::time::sleep(Duration::from_millis(10)).await; + for node in nodes.iter() { + drain_events(node); + } + + total +} + +async fn wait_for_forwarded_payment( + nodes: &[Arc], expected_payment_id: PaymentId, start: Instant, +) -> Duration { + let mut payment_successful = false; + let mut payment_forwarded = false; + + while !payment_successful || !payment_forwarded { + tokio::select! { + event = nodes[0].next_event_async(), if !payment_successful => { + match event { + Event::PaymentSuccessful { payment_id: Some(payment_id), .. } + if payment_id == expected_payment_id => + { + payment_successful = true; + }, + Event::PaymentFailed { payment_id, payment_hash, .. } => { + nodes[0].event_handled().unwrap(); + panic!("Forwarded payment {payment_id:?} failed with hash {payment_hash:?}"); + }, + _ => {}, + } + nodes[0].event_handled().unwrap(); + }, + event = nodes[1].next_event_async(), if !payment_forwarded => { + if matches!(event, Event::PaymentForwarded { .. }) { + payment_forwarded = true; + } + nodes[1].event_handled().unwrap(); + }, + } + } + + start.elapsed() +} + +/// Sends a payment across the benchmark path before measurements start. +/// +/// Channel readiness events alone do not guarantee that the sender can immediately find and use the +/// intended multi-hop path. Waiting for one successful payment keeps route-discovery first-use cost +/// and transient graph propagation failures out of the timed forwarding loop. +async fn wait_for_forwarding_path(nodes: &[Arc]) { + for _ in 0..30 { + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("".to_string()).unwrap()); + let invoice = + nodes[2].bolt11_payment().receive(5_000, &invoice_description.into(), 9217).unwrap(); + if let Ok(payment_id) = nodes[0].bolt11_payment().send(&invoice, None) { + wait_for_payment_success(&nodes[0], payment_id).await; + tokio::time::sleep(Duration::from_millis(50)).await; + for node in nodes { + drain_events(node); + } + return; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + panic!("Timed out waiting for forwarding path readiness"); +} + +async fn open_channel( + node_a: Arc, node_b: Arc, bitcoind: &BitcoindClient, electrsd: &electrsd::ElectrsD, +) -> Duration { + let start = Instant::now(); + let funding_txo = open_channel_to_pending(&node_a, &node_b, 100_000, None, false).await; + let duration = start.elapsed(); + + common::wait_for_tx(&electrsd.client, funding_txo.txid).await; + + generate_blocks_and_wait(bitcoind, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_b, node_a.node_id()); + expect_channel_ready_event!(node_a, node_b.node_id()); + + duration +} + +fn drain_events(node: &Node) { + while node.next_event().is_some() { + node.event_handled().unwrap(); + } +} + +criterion_group!(benches, operations_benchmark); +criterion_main!(benches); diff --git a/benches/payments.rs b/benches/payments.rs index 52769d7949..241d7b2c67 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -8,7 +8,8 @@ use bitcoin::hex::DisplayHex; use bitcoin::Amount; use common::{ expect_channel_ready_event, generate_blocks_and_wait, premine_and_distribute_funds, - random_chain_source, setup_bitcoind_and_electrsd, setup_two_nodes_with_store, + setup_bitcoind_and_electrsd, setup_two_nodes_with_store, store_bench_configs, + wait_for_payment_success, TestChainSource, }; use criterion::{criterion_group, criterion_main, Criterion}; use ldk_node::{Event, Node}; @@ -102,7 +103,7 @@ async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Durat // Send back the money for the next iteration. let mut preimage_bytes = [0u8; 32]; rand::rng().fill_bytes(&mut preimage_bytes); - node_b + let return_payment_id = node_b .spontaneous_payment() .send_with_preimage( amount_msat * total_payments, @@ -112,6 +113,7 @@ async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Durat ) .ok() .unwrap(); + wait_for_payment_success(&node_b, return_payment_id).await; duration } @@ -119,76 +121,84 @@ async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Durat fn payment_benchmark(c: &mut Criterion) { // Set up two nodes. Because this is slow, we reuse the same nodes for each sample. let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let chain_source = random_chain_source(&bitcoind, &electrsd); - - let (node_a, node_b) = setup_two_nodes_with_store( - &chain_source, - false, - true, - false, - common::TestStoreType::Sqlite, - ); - - let runtime = - tokio::runtime::Builder::new_multi_thread().worker_threads(4).enable_all().build().unwrap(); - - let node_a = Arc::new(node_a); - let node_b = Arc::new(node_b); - - // Fund the nodes and setup a channel between them. The criterion function cannot be async, so we need to execute - // the setup using a runtime. - let node_a_cloned = Arc::clone(&node_a); - let node_b_cloned = Arc::clone(&node_b); - runtime.block_on(async move { - let address_a = node_a_cloned.onchain_payment().new_address().unwrap(); - let premine_sat = 25_000_000; - premine_and_distribute_funds( - &bitcoind.client, - &electrsd.client, - vec![address_a], - Amount::from_sat(premine_sat), - ) - .await; - node_a_cloned.sync_wallets().unwrap(); - node_b_cloned.sync_wallets().unwrap(); - open_channel_push_amt( - &node_a_cloned, - &node_b_cloned, - 16_000_000, - Some(1_000_000_000), - false, - &electrsd, - ) - .await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; - node_a_cloned.sync_wallets().unwrap(); - node_b_cloned.sync_wallets().unwrap(); - expect_channel_ready_event!(node_a_cloned, node_b_cloned.node_id()); - expect_channel_ready_event!(node_b_cloned, node_a_cloned.node_id()); - }); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + + let store_configs = store_bench_configs(); + + let runtime = benchmark_runtime(); let mut group = c.benchmark_group("payments"); group.sample_size(10); - group.bench_function("payments", |b| { - // Use custom timing so that sending back the money at the end of each iteration isn't included in the - // measurement. - b.to_async(&runtime).iter_custom(|iter| { + for store_config in store_configs { + let (node_a, node_b) = + setup_two_nodes_with_store(&chain_source, false, true, false, store_config.store_type); + + let node_a = Arc::new(node_a); + let node_b = Arc::new(node_b); + + // Fund the nodes and setup a channel between them. The criterion function cannot be async, + // so we need to execute the setup using a runtime. + let node_a_cloned = Arc::clone(&node_a); + let node_b_cloned = Arc::clone(&node_b); + runtime.block_on(async { + let address_a = node_a_cloned.onchain_payment().new_address().unwrap(); + let premine_sat = 25_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(premine_sat), + ) + .await; + node_a_cloned.sync_wallets().unwrap(); + node_b_cloned.sync_wallets().unwrap(); + open_channel_push_amt( + &node_a_cloned, + &node_b_cloned, + 16_000_000, + Some(1_000_000_000), + false, + &electrsd, + ) + .await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a_cloned.sync_wallets().unwrap(); + node_b_cloned.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a_cloned, node_b_cloned.node_id()); + expect_channel_ready_event!(node_b_cloned, node_a_cloned.node_id()); + }); + + group.bench_function(store_config.name, |b| { + // Use custom timing so that sending back the money at the end of each iteration isn't + // included in the measurement. let node_a = Arc::clone(&node_a); let node_b = Arc::clone(&node_b); - - async move { - let mut total = Duration::ZERO; - for _i in 0..iter { - let node_a = Arc::clone(&node_a); - let node_b = Arc::clone(&node_b); - - total += send_payments(node_a, node_b).await; + b.to_async(&runtime).iter_custom(|iter| { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + + async move { + let mut total = Duration::ZERO; + for _i in 0..iter { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + + total += send_payments(node_a, node_b).await; + } + total } - total - } + }); }); - }); + } +} + +fn benchmark_runtime() -> tokio::runtime::Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(4).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + builder.build().unwrap() } criterion_group!(benches, payment_benchmark); diff --git a/src/io/postgres_store/mod.rs b/src/io/postgres_store/mod.rs index 90b8cdc391..ef2c4cbca9 100644 --- a/src/io/postgres_store/mod.rs +++ b/src/io/postgres_store/mod.rs @@ -35,6 +35,9 @@ pub const DEFAULT_DB_NAME: &str = "ldk_db"; /// The default table in which we store all data. pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data"; +/// Environment variable used by PostgreSQL tests and benchmarks. +pub const POSTGRES_TEST_URL_ENV_VAR: &str = "TEST_POSTGRES_URL"; + // The current schema version for the PostgreSQL store. const SCHEMA_VERSION: u16 = 1; @@ -897,7 +900,7 @@ mod tests { use crate::io::test_utils::{do_read_write_remove_list_persist, do_test_store}; fn test_connection_string() -> String { - std::env::var("TEST_POSTGRES_URL") + std::env::var(POSTGRES_TEST_URL_ENV_VAR) .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/ldk_node_tests".to_string()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a56d46e056..a0e42c187c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -30,6 +30,7 @@ use std::time::Duration; use bitcoin::hashes::hex::FromHex; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; +use bitcoin::secp256k1::PublicKey; use bitcoin::{ Address, Amount, Network, OutPoint, ScriptBuf, Sequence, Transaction, Txid, Witness, }; @@ -41,6 +42,8 @@ use ldk_node::config::{ HumanReadableNamesConfig, }; use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; +#[cfg(feature = "postgres")] +use ldk_node::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ @@ -48,6 +51,7 @@ use ldk_node::{ PendingSweepBalance, UserChannelId, }; use lightning::io; +use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse}; @@ -304,6 +308,26 @@ macro_rules! expect_payment_successful_event { pub(crate) use expect_payment_successful_event; +pub async fn wait_for_payment_success(node: &Node, expected_payment_id: PaymentId) { + loop { + match node.next_event_async().await { + Event::PaymentSuccessful { payment_id: Some(payment_id), .. } + if payment_id == expected_payment_id => + { + node.event_handled().unwrap(); + break; + }, + Event::PaymentFailed { payment_id, payment_hash, .. } => { + node.event_handled().unwrap(); + panic!("Return payment {:?} failed with hash {:?}", payment_id, payment_hash); + }, + _ => { + node.event_handled().unwrap(); + }, + } + } +} + pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { let bitcoind_exe = env::var("BITCOIND_EXE").ok().or_else(|| corepc_node::downloaded_exe_path().ok()).expect( @@ -420,6 +444,33 @@ pub(crate) enum TestStoreType { TestSyncStore, Sqlite, FilesystemStore, + #[cfg(feature = "postgres")] + Postgres, +} + +#[derive(Clone, Copy)] +pub(crate) struct StoreBenchConfig { + pub(crate) name: &'static str, + pub(crate) store_type: TestStoreType, +} + +pub(crate) fn store_bench_configs() -> Vec { + #[cfg(not(feature = "postgres"))] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: TestStoreType::Sqlite }, + StoreBenchConfig { name: "filesystem", store_type: TestStoreType::FilesystemStore }, + ] + } + + #[cfg(feature = "postgres")] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: TestStoreType::Sqlite }, + StoreBenchConfig { name: "filesystem", store_type: TestStoreType::FilesystemStore }, + StoreBenchConfig { name: "postgres", store_type: TestStoreType::Postgres }, + ] + } } impl Default for TestStoreType { @@ -607,6 +658,31 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestStoreType::FilesystemStore => { builder.build_with_fs_store(config.node_entropy.into()).unwrap() }, + #[cfg(feature = "postgres")] + TestStoreType::Postgres => { + use ldk_node::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR; + + let table_name = format!( + "test_{}", + config + .node_config + .storage_dir_path + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .collect::() + ); + let connection_string = std::env::var(POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()); + builder + .build_with_postgres_store( + config.node_entropy.into(), + connection_string, + None, + Some(table_name), + None, + ) + .unwrap() + }, }; node.start().unwrap(); @@ -830,6 +906,23 @@ pub async fn open_channel( pub async fn open_channel_push_amt( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, should_announce: bool, electrsd: &ElectrsD, +) -> OutPoint { + let funding_txo = open_channel_to_pending( + node_a, + node_b, + funding_amount_sat, + push_amount_msat, + should_announce, + ) + .await; + wait_for_tx(&electrsd.client, funding_txo.txid).await; + + funding_txo +} + +pub async fn open_channel_to_pending( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, + should_announce: bool, ) -> OutPoint { if should_announce { node_a @@ -857,11 +950,40 @@ pub async fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid).await; funding_txo_a } +pub async fn wait_for_channel_ready_events( + node: &Node, counterparty_node_id: PublicKey, count: u64, +) { + let mut remaining_count = count; + while remaining_count > 0 { + let event = tokio::time::timeout( + Duration::from_secs(crate::common::INTEROP_TIMEOUT_SECS), + node.next_event_async(), + ) + .await + .unwrap_or_else(|_| { + panic!("{} timed out waiting for ChannelReady event after 60s", node.node_id()) + }); + + match event { + ref e @ Event::ChannelReady { counterparty_node_id: Some(node_id), .. } + if node_id == counterparty_node_id => + { + println!("{} got event {:?}", node.node_id(), e); + remaining_count -= 1; + }, + ref e @ Event::ChannelReady { .. } => { + panic!("{} got unexpected ChannelReady event: {:?}", node.node_id(), e); + }, + _ => {}, + } + node.event_handled().unwrap(); + } +} + pub async fn open_channel_with_all( node_a: &TestNode, node_b: &TestNode, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { @@ -1931,7 +2053,7 @@ impl TestSyncStoreInner { /// `TEST_POSTGRES_URL` environment variable. #[cfg(feature = "postgres")] pub(crate) fn test_connection_string() -> String { - std::env::var("TEST_POSTGRES_URL") + std::env::var(POSTGRES_TEST_URL_ENV_VAR) .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()) } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index c3c2f4262b..1de57036a6 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -3445,6 +3445,8 @@ async fn build_0_7_0_node( TestStoreType::FilesystemStore => builder_old.build_with_fs_store().unwrap(), TestStoreType::Sqlite => builder_old.build().unwrap(), TestStoreType::TestSyncStore => panic!("TestSyncStore not supported in v0.7.0 builder"), + #[cfg(feature = "postgres")] + TestStoreType::Postgres => panic!("Postgres not supported in v0.7.0 builder"), }; node_old.start().unwrap();