From 2b39ddaeb810b4014dcd9c9bf8af126be9d12c00 Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 1 Jul 2026 20:38:38 -0700 Subject: [PATCH 1/2] [spr] initial version Created using spr 1.3.6-beta.1 --- .../execution/src/test_utils.rs | 9 +- nexus/src/app/sagas/instance_start.rs | 47 +- nexus/src/app/sagas/instance_update/mod.rs | 56 +- nexus/test-utils/src/lib.rs | 4 +- nexus/test-utils/src/nexus_test.rs | 103 ++-- nexus/test-utils/src/starter.rs | 11 +- test-utils/src/dev/dendrite.rs | 222 ++++++-- test-utils/src/dev/mod.rs | 1 + test-utils/src/dev/tcp_proxy.rs | 539 ++++++++++++++++++ wicketd/src/nexus_proxy.rs | 4 + 10 files changed, 787 insertions(+), 209 deletions(-) create mode 100644 test-utils/src/dev/tcp_proxy.rs diff --git a/nexus/reconfigurator/execution/src/test_utils.rs b/nexus/reconfigurator/execution/src/test_utils.rs index fdb17289225..ee61103c878 100644 --- a/nexus/reconfigurator/execution/src/test_utils.rs +++ b/nexus/reconfigurator/execution/src/test_utils.rs @@ -110,8 +110,13 @@ pub fn overridables_for_test( let sled_id = id_str.parse().unwrap(); let ip = Ipv6Addr::LOCALHOST; let mgs_port = cptestctx.gateway.get(&switch_slot).unwrap().port; - let dendrite_port = - cptestctx.dendrite.read().unwrap().get(&switch_slot).unwrap().port; + let dendrite_port = cptestctx + .dendrite + .read() + .unwrap() + .get(&switch_slot) + .unwrap() + .port(); let mgd_port = cptestctx.mgd.get(&switch_slot).unwrap().port; let ddm_port = cptestctx.ddm.get(&switch_slot).unwrap().port; overrides.override_switch_zone_ip(sled_id, ip); diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 26e74cc1890..0a48a549ce0 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -1165,7 +1165,6 @@ async fn sis_ensure_running( mod test { use core::time::Duration; use nexus_types::identity::Asset as _; - use std::net::SocketAddrV6; use crate::app::sagas::disk_delete::test::ExpungeTestHarness; use crate::app::sagas::disk_delete::test::create_disk; @@ -1397,19 +1396,14 @@ mod test { .expect("unable to update switch1 settings"); // Shutdown one of the switch daemons - let mut switch0_dpd = cptestctx - .dendrite - .write() - .unwrap() - .remove(&SwitchSlot::Switch0) - .expect("there should be at least one dendrite running"); - - let switch0_port = switch0_dpd.port; - - switch0_dpd - .cleanup() - .await - .expect("switch0 process should get cleaned up"); + let switch0_port = { + let dendrite_guard = cptestctx.dendrite.read().unwrap(); + dendrite_guard + .get(&SwitchSlot::Switch0) + .expect("a dendrite instance should exist for switch0") + .port() + }; + cptestctx.stop_dendrite(SwitchSlot::Switch0).await; let log = &opctx.log; @@ -1455,7 +1449,7 @@ mod test { dendrite_guard .get(&SwitchSlot::Switch1) .expect("two dendrites should be present in test context") - .port + .port() }; let client_state = dpd_client::ClientState { @@ -1505,28 +1499,9 @@ mod test { .await .expect("NAT entry should appear on switch1"); - // Reuse the port number from the removed Switch0 to start a new dendrite instance - let nexus_address = cptestctx.internal_client.bind_address; - let mgs = cptestctx.gateway.get(&SwitchSlot::Switch0).unwrap(); - let mgs_address = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); - // Test fault recovery for nat propogation - // Start a new dendrite instance for switch0 - let new_switch0 = - omicron_test_utils::dev::dendrite::DendriteInstance::start( - switch0_port, - Some(nexus_address), - Some(mgs_address), - ) - .await - .unwrap(); - - cptestctx - .dendrite - .write() - .unwrap() - .insert(SwitchSlot::Switch0, new_switch0); + // Start a new dpd for switch0 + cptestctx.restart_dendrite(SwitchSlot::Switch0).await; // Ensure that the nat entry for the address has made it onto the new switch0 dendrite. // This might take some time while the new dendrite comes online. diff --git a/nexus/src/app/sagas/instance_update/mod.rs b/nexus/src/app/sagas/instance_update/mod.rs index f89f0fc856a..3ce790ce0d0 100644 --- a/nexus/src/app/sagas/instance_update/mod.rs +++ b/nexus/src/app/sagas/instance_update/mod.rs @@ -2554,7 +2554,7 @@ mod test { .unwrap(); // Shutdown switch 0. - shutdown_switch0(&cptestctx).await; + cptestctx.stop_dendrite(SwitchSlot::Switch0).await; assert!(switch0_dpd_client.dpd_uptime().await.is_err()); // Okay, now that we've taken down one of the simulated switches, we @@ -2656,7 +2656,7 @@ mod test { .await; // Shut down switch 0. - let switch0_port = shutdown_switch0(&cptestctx).await; + cptestctx.stop_dendrite(SwitchSlot::Switch0).await; assert!(switch0_dpd_client.dpd_uptime().await.is_err()); // Run the instance-update saga to complete the migration. @@ -2696,7 +2696,7 @@ mod test { .unwrap(); // Restart switch 0 and verify it also gets the new entries. - restart_switch0(&cptestctx, switch0_port).await; + cptestctx.restart_dendrite(SwitchSlot::Switch0).await; wait_for_n_nat_entries( log, &switch0_dpd_client, @@ -2819,7 +2819,7 @@ mod test { let port = dendrite_guard .get(&switch_slot) .expect("dendrite should be present for this switch slot") - .port; + .port(); let client_state = dpd_client::ClientState { tag: String::from("nexus"), log: cptestctx.logctx.log.new(o!( @@ -2913,54 +2913,6 @@ mod test { }) } - /// Shut down switch 0's dendrite, returning the port it was listening on. - async fn shutdown_switch0(cptestctx: &ControlPlaneTestContext) -> u16 { - let mut switch0_dpd = cptestctx - .dendrite - .write() - .unwrap() - .remove(&SwitchSlot::Switch0) - .expect("switch 0 dendrite should be running"); - - let port = switch0_dpd.port; - - switch0_dpd - .cleanup() - .await - .expect("switch0 process should get cleaned up"); - - port - } - - /// Restart switch 0's dendrite on the given port. - async fn restart_switch0( - cptestctx: &ControlPlaneTestContext, - switch0_port: u16, - ) { - use std::net::Ipv6Addr; - use std::net::SocketAddrV6; - - let nexus_address = cptestctx.internal_client.bind_address; - let mgs = cptestctx.gateway.get(&SwitchSlot::Switch0).unwrap(); - let mgs_address = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); - - let new_switch0 = - omicron_test_utils::dev::dendrite::DendriteInstance::start( - switch0_port, - Some(nexus_address), - Some(mgs_address), - ) - .await - .unwrap(); - - cptestctx - .dendrite - .write() - .unwrap() - .insert(SwitchSlot::Switch0, new_switch0); - } - // === migration test helpers === #[derive(Clone, Copy, Default)] diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index b878297bb08..722de50ce2d 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -131,12 +131,12 @@ pub fn dpd_client( let dendrite_guard = cptestctx.dendrite.read().unwrap(); let (switch_slot, dendrite_instance) = dendrite_guard .iter() - .next() + .find(|(_, instance)| instance.is_dpd_running()) .expect("No dendrite instances running for test"); // Copy the values we need while the guard is still alive let switch_slot = *switch_slot; - let port = dendrite_instance.port; + let port = dendrite_instance.port(); drop(dendrite_guard); let client_state = dpd_client::ClientState { diff --git a/nexus/test-utils/src/nexus_test.rs b/nexus/test-utils/src/nexus_test.rs index 6ca57652363..857369d5db9 100644 --- a/nexus/test-utils/src/nexus_test.rs +++ b/nexus/test-utils/src/nexus_test.rs @@ -113,9 +113,9 @@ pub struct ControlPlaneTestContext { pub oximeter: Oximeter, pub producer: ProducerServer, pub gateway: BTreeMap, + /// All dpd instances, whether currently running or not, indexed by switch + /// slot. pub dendrite: RwLock>, - /// Ports of stopped dendrite instances (for use by start_dendrite) - pub stopped_dendrite_ports: RwLock>, pub mgd: HashMap, pub ddm: HashMap, pub external_dns_zone_name: String, @@ -214,22 +214,33 @@ impl ControlPlaneTestContext { /// Stop a Dendrite instance for testing failure scenarios. /// - /// Stores the port so that [`Self::restart_dendrite`] can restart on the same port. + /// Panics if no Dendrite was found for the given switch slot, or it was + /// already stopped. (But note that the Dendrite instance is temporarily + /// removed from the switch slot while it is being stopped, so trying to + /// call this function with the same switch slot concurrently may panic.) pub async fn stop_dendrite(&self, switch_slot: SwitchSlot) { use slog::debug; let log = &self.logctx.log; debug!(log, "Stopping Dendrite"; "switch_slot" => ?switch_slot); - let dendrite_opt = - { self.dendrite.write().unwrap().remove(&switch_slot) }; - if let Some(mut dendrite) = dendrite_opt { - // Store the port for later restart via start_dendrite - self.stopped_dendrite_ports - .write() - .unwrap() - .insert(switch_slot, dendrite.port); - dendrite.cleanup().await.unwrap(); - } + // Extract from mutex first to avoid holding lock across await + let mut dendrite = + self.dendrite.write().unwrap().remove(&switch_slot).unwrap_or_else( + || { + panic!( + "a dendrite instance should exist \ + for switch slot {switch_slot:?}" + ); + }, + ); + + let prior_dpd_state = dendrite.stop_dpd().await.unwrap(); + assert_eq!( + prior_dpd_state, + dev::dendrite::PriorDpdState::Running, + "dendrite should have been running before stop_dendrite" + ); + self.dendrite.write().unwrap().insert(switch_slot, dendrite); } /// Restart a Dendrite instance for testing drift correction scenarios. @@ -237,8 +248,13 @@ impl ControlPlaneTestContext { /// Simulates a switch restart where DPD loses its programmed state. /// Restarts on the same port so test DNS stays valid. /// - /// Works both when Dendrite is currently running (will stop and restart) - /// or when it was previously stopped via [`Self::stop_dendrite`]. + /// Works both when Dendrite is currently running (will stop and restart) or + /// when it was previously stopped via [`Self::stop_dendrite`]. + /// + /// Panics if no Dendrite was found for the given switch slot. (But note + /// that the Dendrite instance is temporarily removed from the internal map + /// while it is being restarted, so trying to call this function with the + /// same switch slot concurrently may panic.) pub async fn restart_dendrite(&self, switch_slot: SwitchSlot) { use slog::debug; let log = self.logctx.log.new(slog::o!( @@ -246,48 +262,23 @@ impl ControlPlaneTestContext { )); debug!(log, "Restarting Dendrite"); - // Get port either from running instance or from stored port after stop - // Extract from mutex first to avoid holding lock across await - let old = self.dendrite.write().unwrap().remove(&switch_slot); - let port = if let Some(mut old) = old { - let port = old.port; - debug!( - log, "Shutting down old dpd instance for restart"; - "port" => port, + // Extract from mutex first to avoid holding the lock across an await + // point. This does mean that while restart_dendrite is running, other + // code wouldn't be able to find the dpd instance via the switch slot. + let mut dendrite = + self.dendrite.write().unwrap().remove(&switch_slot).unwrap_or_else( + || { + panic!( + "a dendrite instance should exist \ + for switch slot {switch_slot:?}" + ); + }, ); - old.cleanup().await.unwrap(); - port - } else { - // Must have been stopped - get stored port - let port = self.stopped_dendrite_ports - .write() - .unwrap() - .remove(&switch_slot) - .expect("Dendrite not running and no stored port from stop_dendrite"); - debug!( - log, "Reusing port from previously-shut-down dpd instance"; - "port" => port, - ); - port - }; - - let mgs = self.gateway.get(&switch_slot).unwrap(); - let mgs_addr = std::net::SocketAddrV6::new( - std::net::Ipv6Addr::LOCALHOST, - mgs.port, - 0, - 0, - ) - .into(); + let port = dendrite.port(); + debug!(log, "Restarting dpd behind its proxy"; "port" => port); - let dendrite = - omicron_test_utils::dev::dendrite::DendriteInstance::start( - port, - Some(self.internal_client.bind_address), - Some(mgs_addr), - ) - .await - .unwrap(); + let prior_dpd_state = dendrite.restart_dpd().await.unwrap(); + debug!(log; "Restarted dpd"; "prior_dpd_state" => ?prior_dpd_state); // Wait for Dendrite to be ready before returning. // We check `switch_identifiers()` rather than just `dpd_uptime()` @@ -332,7 +323,7 @@ impl ControlPlaneTestContext { for (_, gateway) in self.gateway { gateway.teardown().await; } - for (_, mut dendrite) in self.dendrite.into_inner().unwrap() { + for (_, dendrite) in self.dendrite.into_inner().unwrap() { dendrite.cleanup().await.unwrap(); } for (_, mut mgd) in self.mgd { diff --git a/nexus/test-utils/src/starter.rs b/nexus/test-utils/src/starter.rs index 1050d324e3b..d123d345c97 100644 --- a/nexus/test-utils/src/starter.rs +++ b/nexus/test-utils/src/starter.rs @@ -433,13 +433,13 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { // Set up a stub instance of dendrite let dendrite = dev::dendrite::DendriteInstance::start( - 0, self.nexus_internal_addr, - Some(mgs_addr), + mgs_addr, + &log.new(o!("switch_slot" => format!("{switch_slot:?}"))), ) .await .unwrap(); - let port = dendrite.port; + let port = dendrite.port(); self.dendrite.write().unwrap().insert(switch_slot, dendrite); let address = SocketAddrV6::new(Ipv6Addr::LOCALHOST, port, 0, 0); @@ -507,7 +507,7 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { .unwrap() .get(&switch_slot) .unwrap() - .port, + .port(), mgs: self.gateway.get(&switch_slot).unwrap().port, mgd: self.mgd.get(&switch_slot).unwrap().port, ddm: self.ddm.get(&switch_slot).unwrap().port, @@ -1284,7 +1284,6 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { logctx: self.logctx, gateway: self.gateway, dendrite: RwLock::new(self.dendrite.into_inner().unwrap()), - stopped_dendrite_ports: RwLock::new(HashMap::new()), mgd: self.mgd, ddm: self.ddm, external_dns_zone_name: self.external_dns_zone_name.unwrap(), @@ -1322,7 +1321,7 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { for (_, gateway) in self.gateway { gateway.teardown().await; } - for (_, mut dendrite) in self.dendrite.into_inner().unwrap() { + for (_, dendrite) in self.dendrite.into_inner().unwrap() { dendrite.cleanup().await.unwrap(); } for (_, mut mgd) in self.mgd { diff --git a/test-utils/src/dev/dendrite.rs b/test-utils/src/dev/dendrite.rs index 29ba52d5810..52509726834 100644 --- a/test-utils/src/dev/dendrite.rs +++ b/test-utils/src/dev/dendrite.rs @@ -11,57 +11,166 @@ use std::time::Duration; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; use camino_tempfile::Utf8TempDir; +use slog::Logger; use tokio::{ fs::File, io::{AsyncBufReadExt, BufReader}, time::{Instant, sleep}, }; +use crate::dev::tcp_proxy::{ProxyTarget, RetargetableTcpProxy}; + /// Specifies the amount of time we will wait for `dpd` to launch, /// which is currently confirmed by watching `dpd`'s log output /// for a message specifying the address and port `dpd` is listening on. pub const DENDRITE_TIMEOUT: Duration = Duration::new(30, 0); -/// Represents a running instance of the Dendrite dataplane daemon (dpd). -pub struct DendriteInstance { - /// Port number the dpd instance is listening on. This can be provided - /// manually, or dynamically determined if a value of 0 is provided. - pub port: u16, - /// Arguments provided to the `dpd` cli command. - pub args: Vec, - /// Child process spawned by running `dpd` - pub child: Option, +/// State of a running Dendrite instance. +#[derive(Debug)] +struct RunningDendrite { + /// Child process spawned by running `dpd`. + child: tokio::process::Child, /// Temporary directory where logging output and other files generated by /// `dpd` are stored. - pub data_dir: Option, + data_dir: Utf8PathBuf, +} + +impl RunningDendrite { + async fn stop(mut self) -> Result<(), anyhow::Error> { + self.child.start_kill().context("Sending SIGKILL to child")?; + self.child.wait().await.context("waiting for child")?; + std::fs::remove_dir_all(&self.data_dir).with_context(|| { + format!("cleaning up temporary directory {}", self.data_dir) + })?; + Ok(()) + } +} + +/// The state of dpd before `stop_dpd` or `restart_dpd` was called. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PriorDpdState { + /// dpd was running and was stopped (before being restarted in case of + /// `restart_dpd`). + Running, + /// dpd was already stopped (before being started in case of `restart_dpd`). + Stopped, +} + +/// Represents a running instance of the Dendrite dataplane daemon (dpd). +#[derive(Debug)] +pub struct DendriteInstance { + /// Holds a TCP port bound for the lifetime of this instance, allowing for + /// dpd to be killed and restarted behind the scenes without worrying about + /// ephemeral port reuse. + /// + /// This is always `Some` for the lifetime of the `DendriteInstance`, and is + /// only `None` inside methods that consume the instance. + proxy: Option, + /// Some while dpd is running, otherwise None. + running: Option, + /// The address of the Nexus control plane. + /// + /// dpd uses it to notify the control plane that it has started. + nexus_address: Option, + /// The MGS address. + mgs_address: SocketAddr, } impl DendriteInstance { pub async fn start( - port: u16, nexus_address: Option, - mgs_address: Option, + mgs_address: SocketAddr, + log: &Logger, ) -> Result { - let mut port = port; + let proxy = RetargetableTcpProxy::bind( + ProxyTarget::Refuse, + &log.new(o!("test_component" => "DendriteInstance")), + ) + .await + .context("binding TCP proxy in front of dpd")?; + let mut instance = Self { + proxy: Some(proxy), + running: None, + nexus_address, + mgs_address, + }; + instance.spawn_dpd().await?; + Ok(instance) + } + + /// The port clients should use to connect to dpd. + /// + /// This port stays bound whether dpd is running or not. + pub fn port(&self) -> u16 { + // This is the proxy's port. + self.proxy().port() + } + + /// Returns true whether dpd is currently running. + /// + /// Note that [`Self::port`] stays valid either way. + pub fn is_dpd_running(&self) -> bool { + self.running.is_some() + } + + /// Stop the underlying dpd server. + /// + /// Note that [`Self::port`] remains valid after this, since the port + /// represents a TCP proxy. + pub async fn stop_dpd(&mut self) -> Result { + // Make the proxy reject connections before killing dpd. + self.proxy().set_target(ProxyTarget::Refuse).await; + match self.running.take() { + Some(running) => { + running.stop().await?; + Ok(PriorDpdState::Running) + } + None => Ok(PriorDpdState::Stopped), + } + } + + /// Stop dpd if it is running, and then start it again. + /// + /// [`Self::port`] continues to return the same port after this, since it + /// represents a TCP proxy. + pub async fn restart_dpd( + &mut self, + ) -> Result { + // If Dendrite is started before Nexus, the `nexus_address` isn't + // available. A restarted dpd would come up healthy but not be able to + // notify Nexus. Let's panic here, and revisit this if a test + // legitimately needs that combination. + assert!( + self.nexus_address.is_some(), + "restarted dpd should be able to notify Nexus, but no Nexus \ + address was recorded at start (was dendrite started before \ + Nexus?)" + ); + let outcome = self.stop_dpd().await?; + self.spawn_dpd().await?; + Ok(outcome) + } + + async fn spawn_dpd(&mut self) -> Result<(), anyhow::Error> { let temp_dir = Utf8TempDir::new()?; - let address_one = format!("[::1]:{port}"); + // We always bind dpd to a fresh ephemeral port and front it by a TCP + // proxy to avoid port reuse races. See + // https://github.com/oxidecomputer/omicron/issues/10697. let mut args = vec![ "run".to_string(), "--listen-addresses".to_string(), - address_one, + "[::1]:0".to_string(), "--enable-rpw".to_string(), ]; - if let Some(socket_addr) = nexus_address { + if let Some(socket_addr) = self.nexus_address { args.push("--nexus-address".to_string()); args.push(socket_addr.to_string()); } - if let Some(socket_addr) = mgs_address { - args.push("--mgs-address".to_string()); - args.push(socket_addr.to_string()); - } + args.push("--mgs-address".to_string()); + args.push(self.mgs_address.to_string()); let mut child = tokio::process::Command::new("dpd") .args(&args) @@ -71,6 +180,9 @@ impl DendriteInstance { "dendrite_stdout", )?)) .stderr(Stdio::piped()) + // If any of the below checks produce an error/early return, kill + // the dpd process. + .kill_on_drop(true) .spawn() .with_context(|| { format!("failed to spawn `dpd` (with args: {:?})", &args) @@ -78,58 +190,58 @@ impl DendriteInstance { let stderr = child.stderr.take().unwrap(); let temp_dir = temp_dir.keep(); - if port == 0 { - port = discover_port( - &mut child, - stderr, - temp_dir.join("dendrite_stdout"), - ) - .await - .with_context(|| { - format!( - "failed to discover dendrite port from files in {temp_dir}" - ) - })?; - } + let dpd_port = discover_port( + &mut child, + stderr, + temp_dir.join("dendrite_stdout"), + ) + .await + .with_context(|| { + format!("failed to discover dendrite port from files in {temp_dir}") + })?; + + let dpd_address = + SocketAddrV6::new(Ipv6Addr::LOCALHOST, dpd_port, 0, 0); + self.proxy().set_target(ProxyTarget::Forward(dpd_address.into())).await; - // Print the dendrite address to stderr. This is captured by nextest and - // kept in the run recording, so that a cross-test ephemeral port - // collision can be traced back to a test even if it passes. See + // Print both the proxy and the backend address to stderr. This is + // captured by nextest and kept in the run recording. See // https://github.com/oxidecomputer/omicron/issues/10697. - let address = SocketAddrV6::new(Ipv6Addr::LOCALHOST, port, 0, 0); - eprintln!("Dendrite address: {address}"); + let address = self.proxy().local_addr(); + eprintln!("Dendrite address: {address} (dpd backend: {dpd_address})"); - Ok(Self { port, args, child: Some(child), data_dir: Some(temp_dir) }) + self.running = Some(RunningDendrite { child, data_dir: temp_dir }); + Ok(()) } - pub async fn cleanup(&mut self) -> Result<(), anyhow::Error> { - if let Some(mut child) = self.child.take() { - child.start_kill().context("Sending SIGKILL to child")?; - child.wait().await.context("waiting for child")?; - } - if let Some(dir) = self.data_dir.take() { - std::fs::remove_dir_all(&dir).with_context(|| { - format!("cleaning up temporary directory {dir}") - })?; + pub async fn cleanup(mut self) -> Result<(), anyhow::Error> { + self.stop_dpd().await?; + if let Some(proxy) = self.proxy.take() { + proxy.shutdown().await; } Ok(()) } + + fn proxy(&self) -> &RetargetableTcpProxy { + self.proxy + .as_ref() + .expect("proxy is running across all non-consuming methods") + } } impl Drop for DendriteInstance { fn drop(&mut self) { - if self.child.is_some() || self.data_dir.is_some() { + if let Some(running) = &mut self.running { eprintln!( "WARN: dropped DendriteInstance without cleaning it up first \ (there may still be a child process running and a \ temporary directory leaked)" ); - if let Some(child) = self.child.as_mut() { - let _ = child.start_kill(); - } - if let Some(path) = self.data_dir.take() { - eprintln!("WARN: dendrite temporary directory leaked: {path}"); - } + let _ = running.child.start_kill(); + eprintln!( + "WARN: dendrite temporary directory leaked: {}", + running.data_dir + ); } } } diff --git a/test-utils/src/dev/mod.rs b/test-utils/src/dev/mod.rs index 0acdb180c42..c88e26a3cee 100644 --- a/test-utils/src/dev/mod.rs +++ b/test-utils/src/dev/mod.rs @@ -13,6 +13,7 @@ pub mod maghemite; pub mod poll; #[cfg(feature = "seed-gen")] pub mod seed; +pub mod tcp_proxy; pub mod test_cmds; use anyhow::{Context, Result}; diff --git a/test-utils/src/dev/tcp_proxy.rs b/test-utils/src/dev/tcp_proxy.rs new file mode 100644 index 00000000000..12a641a501a --- /dev/null +++ b/test-utils/src/dev/tcp_proxy.rs @@ -0,0 +1,539 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A simple TCP proxy that provides a stable port for test services so that +//! their backends can be restarted without races. +//! +//! Some of our tests do the following set of operations: +//! +//! 1. Start a service on an ephemeral port (i.e., bind to port 0). +//! 2. Kill the service. +//! 3. Restart the service on the same port. +//! +//! The problem is that between steps 2 and 3, it is possible for the port to be +//! reused by a different test running concurrently, causing interference +//! between the tests. +//! +//! This module provides a stable port for that scenario. Clients connect to the +//! TCP proxy; the proxy is notified of whether the backend is up, and if so, +//! which port it is using. +//! +//! Adapted from `wicketd/src/nexus_proxy.rs`. We keep them separate because the +//! proxies are likely to diverge in functionality in the future (e.g., this +//! proxy might gain more fault injection capabilities). + +use std::io; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +use std::sync::Arc; +use std::time::Duration; + +use slog::{Logger, debug, info, o, warn}; +use slog_error_chain::InlineErrorChain; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{RwLock, oneshot}; + +/// The target for a [`RetargetableTcpProxy`]. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ProxyTarget { + /// Forward the connection to the specified backend address. + Forward(SocketAddr), + /// Close the connection immediately, mimicking a dead backend. + Refuse, +} + +/// A simple TCP proxy that forwards connections to a backend which can be +/// changed out from underneath. +#[derive(Debug)] +pub struct RetargetableTcpProxy { + local_addr: SocketAddrV6, + target: Arc>, + shutdown_tx: Option>, + task: Option>, + log: Logger, +} + +impl RetargetableTcpProxy { + /// Start a new [`RetargetableTcpProxy`] that forwards connections to the + /// specified target. + pub async fn bind(target: ProxyTarget, log: &Logger) -> io::Result { + let listener = + TcpListener::bind(SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0)) + .await?; + let local_addr = match listener.local_addr()? { + SocketAddr::V6(addr) => addr, + SocketAddr::V4(addr) => { + unreachable!( + "bound an IPv6 listener but the kernel returned \ + an IPv4 local address: {addr}" + ); + } + }; + let log = log.new(o!( + "component" => "RetargetableTcpProxy", + "proxy_addr" => local_addr.to_string(), + )); + info!(log, "TCP proxy listening"; "target" => ?target); + + let target = Arc::new(RwLock::new(target)); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let task = tokio::spawn(run_proxy( + listener, + Arc::clone(&target), + shutdown_rx, + log.clone(), + )); + + Ok(Self { + local_addr, + target, + shutdown_tx: Some(shutdown_tx), + task: Some(task), + log, + }) + } + + /// Return the local address of the proxy. + pub fn local_addr(&self) -> SocketAddrV6 { + self.local_addr + } + + /// Return the TCP port of the proxy. + pub fn port(&self) -> u16 { + self.local_addr.port() + } + + /// Set a new target for the proxy. + /// + /// Existing connections that have already been set up will keep using the + /// old targets. Once this returns, however, no new connections can reach + /// the old target. + /// + /// This is cancel-safe: if the future returned by this method is dropped, + /// the target will not be replaced. + /// + /// Returns the previous target. + pub async fn set_target(&self, target: ProxyTarget) -> ProxyTarget { + let old_target = { + let mut guard = self.target.write().await; + // Note that we don't hold the guard past an await point. + std::mem::replace(&mut *guard, target) + }; + info!( + self.log, "retargeted TCP proxy"; + "old_target" => ?old_target, + "new_target" => ?target, + ); + old_target + } + + /// Stops the accept loop and releases the port, blocking until the port no + /// longer accepts new connections. + /// + /// Established connections will run to completion, but new connections will + /// not be accepted. + /// + /// This is not cancel-safe in the sense that the shutdown message is sent + /// immediately, and that the only thing cancelling the future will do is + /// not wait for the port to fully stop accepting new connections. + pub async fn shutdown(mut self) { + self.send_shutdown(); + if let Some(task) = self.task.take() { + task.await.expect("TCP proxy task exited cleanly"); + } + } + + fn send_shutdown(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + info!(self.log, "shutting down TCP proxy"); + // A send failure means the task already exited. + if tx.send(()).is_err() { + warn!( + self.log, + "failed to send shutdown signal (task already exited)" + ); + } + } + } +} + +impl Drop for RetargetableTcpProxy { + fn drop(&mut self) { + self.send_shutdown(); + } +} + +async fn run_proxy( + listener: TcpListener, + target: Arc>, + mut shutdown_rx: oneshot::Receiver<()>, + log: Logger, +) { + loop { + tokio::select! { + // TcpListener::accept() is cancel-safe, per Tokio documentation. + result = listener.accept() => { + match result { + Ok((client_stream, peer)) => { + let log = log.new(o!("peer" => peer)); + debug!(log, "accepted connection"); + tokio::spawn(run_connection( + client_stream, + Arc::clone(&target), + log, + )); + } + Err(error) => { + // accept() erroring out might be something like EMFILE + // (too many files open). We shouldn't exit the proxy on + // something like that -- it's likely transient, and we + // don't have a great reporting channel anyway. + const ACCEPT_ERROR_SLEEP_DURATION: Duration = + Duration::from_millis(100); + + warn!( + log, "accept() failed (continuing after sleep)"; + InlineErrorChain::new(&error), + "sleep_duration" => ?ACCEPT_ERROR_SLEEP_DURATION, + ); + tokio::time::sleep(ACCEPT_ERROR_SLEEP_DURATION).await; + } + } + } + + // This is cancel-safe because it awaits a &mut Future. It is not + // susceptible to futurelock because run_proxy runs on a different + // task from the one that sends the shutdown message. + _ = &mut shutdown_rx => { + info!(log, "TCP proxy exiting"); + return; + } + } + } +} + +/// Causes closing the stream to produce a connection reset, simulating +/// something closer to (but not quite) a dead server. +fn reset_on_drop(stream: &TcpStream, log: &Logger) { + if let Err(error) = stream.set_zero_linger() { + debug!( + log, "set_zero_linger failed (connection already dead?)"; + InlineErrorChain::new(&error), + ); + } +} + +async fn run_connection( + mut client_stream: TcpStream, + target: Arc>, + log: Logger, +) { + let mut backend_stream = { + // Here, the read guard _is_ held across the connect await point so that + // it is synchronized with set_target (which acquires a write lock). But + // it is _not_ held across copy_bidirectional below -- we do not want + // deadlocks involving `set_target` and open connections! + let guard = target.read().await; + match *guard { + ProxyTarget::Forward(backend_addr) => { + match TcpStream::connect(backend_addr).await { + Ok(stream) => { + debug!( + log, "connected to backend"; + "backend_addr" => backend_addr, + ); + stream + } + Err(error) => { + warn!( + log, + "failed to connect to backend; \ + resetting client connection"; + "backend_addr" => backend_addr, + InlineErrorChain::new(&error), + ); + reset_on_drop(&client_stream, &log); + return; + } + } + } + ProxyTarget::Refuse => { + debug!( + log, + "refusing accepted connection; resetting it immediately" + ); + reset_on_drop(&client_stream, &log); + return; + } + } + }; + + // Setting `TCP_NODELAY` avoids Nagle-induced latency. Failure implies that + // the connection is already dead, which will be reported in the + // copy_bidirectional call's result below. + if let Err(error) = client_stream.set_nodelay(true) { + debug!( + log, "set_nodelay failed on client stream"; + InlineErrorChain::new(&error), + ); + } + if let Err(error) = backend_stream.set_nodelay(true) { + debug!( + log, "set_nodelay failed on backend stream"; + InlineErrorChain::new(&error), + ); + } + + match tokio::io::copy_bidirectional(&mut client_stream, &mut backend_stream) + .await + { + Ok((client_to_backend, backend_to_client)) => { + debug!( + log, "proxied connection closed"; + "bytes_client_to_backend" => client_to_backend, + "bytes_backend_to_client" => backend_to_client, + ); + } + Err(error) => { + // This is expected when the backend is SIGKILLed mid-connection. + // Log it as a warning anyway to flag cases where the process + // aborted or similar. + warn!( + log, "proxied connection ended with error"; + InlineErrorChain::new(&error), + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + enum Persistence { + Persistent, + OneShot, + } + + /// A simple TCP backend that echoes what was provided to it. + #[derive(Debug)] + struct TestBackend { + addr: SocketAddr, + accepts: Arc, + } + + impl TestBackend { + async fn spawn( + reply_prefix: &'static [u8], + persistence: Persistence, + ) -> Self { + let listener = TcpListener::bind(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + 0, + 0, + 0, + )) + .await + .expect("bound backend listener"); + let addr = listener.local_addr().expect("read backend local addr"); + let accepts = Arc::new(AtomicUsize::new(0)); + let accepts_bg = Arc::clone(&accepts); + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + return; + }; + accepts_bg.fetch_add(1, Ordering::Relaxed); + tokio::spawn(async move { + let mut buf = [0u8; 5]; + loop { + if stream.read_exact(&mut buf).await.is_err() { + return; + } + if !reply_prefix.is_empty() + && stream.write_all(reply_prefix).await.is_err() + { + return; + } + if stream.write_all(&buf).await.is_err() { + return; + } + match persistence { + Persistence::OneShot => return, + Persistence::Persistent => { + // Keep looping to read more data. + } + } + } + }); + } + }); + Self { addr, accepts } + } + + fn addr(&self) -> SocketAddr { + self.addr + } + + fn accepted(&self) -> usize { + self.accepts.load(Ordering::Relaxed) + } + } + + async fn roundtrip(proxy_addr: SocketAddr, request: &[u8; 5]) -> Vec { + let mut conn = + TcpStream::connect(proxy_addr).await.expect("connected to proxy"); + conn.write_all(request).await.expect("wrote request"); + let mut response = Vec::new(); + conn.read_to_end(&mut response).await.expect("read response"); + response + } + + #[tokio::test] + async fn proxy_forwards_and_retargets() { + let logctx = crate::dev::test_setup_log("proxy_forwards_and_retargets"); + let backend_a = TestBackend::spawn(b"A:", Persistence::OneShot).await; + let backend_b = TestBackend::spawn(b"B:", Persistence::OneShot).await; + + let proxy = RetargetableTcpProxy::bind( + ProxyTarget::Forward(backend_a.addr()), + &logctx.log, + ) + .await + .expect("bound proxy"); + let proxy_addr = SocketAddr::V6(proxy.local_addr()); + assert_ne!(proxy.port(), 0); + + assert_eq!(roundtrip(proxy_addr, b"hello").await, b"A:hello"); + + // Once Refuse is set, connections are accepted and closed immediately. + proxy.set_target(ProxyTarget::Refuse).await; + let mut conn = + TcpStream::connect(proxy_addr).await.expect("connected to proxy"); + let mut response = Vec::new(); + // reset_on_drop sets SO_LINGER(0) which shows up as a ConnectionReset. + let error = conn + .read_to_end(&mut response) + .await + .expect_err("connection is reset in Refuse mode"); + assert_eq!(error.kind(), std::io::ErrorKind::ConnectionReset); + + // Test the point of the TCP proxy. While the backend is down, the proxy + // must keep the port bound so a concurrently running process (like a + // test) can't grab it. + TcpListener::bind(proxy_addr).await.expect_err( + "proxy keeps the port bound while the backend is down, so a \ + concurrent binder cannot steal it", + ); + + proxy.set_target(ProxyTarget::Forward(backend_b.addr())).await; + assert_eq!(roundtrip(proxy_addr, b"world").await, b"B:world"); + + proxy.shutdown().await; + // The port must be rebindable after shutdown. + TcpListener::bind(proxy_addr) + .await + .expect("proxy port is released after shutdown"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn refuse_quiesces_old_backend() { + let logctx = crate::dev::test_setup_log("refuse_quiesces_old_backend"); + let backend = TestBackend::spawn(b"A:", Persistence::OneShot).await; + + let proxy = RetargetableTcpProxy::bind( + ProxyTarget::Forward(backend.addr()), + &logctx.log, + ) + .await + .expect("bound proxy"); + let proxy_addr = SocketAddr::V6(proxy.local_addr()); + + assert_eq!(roundtrip(proxy_addr, b"hello").await, b"A:hello"); + assert_eq!(backend.accepted(), 1); + + proxy.set_target(ProxyTarget::Refuse).await; + + // Attempt to hit the backend a bunch of times now. It should never be + // successful. + for _ in 0..16 { + let mut conn = TcpStream::connect(proxy_addr) + .await + .expect("connected to proxy"); + let mut response = Vec::new(); + let error = conn + .read_to_end(&mut response) + .await + .expect_err("connection is reset in Refuse mode"); + assert_eq!(error.kind(), std::io::ErrorKind::ConnectionReset); + } + assert_eq!( + backend.accepted(), + 1, + "no connection should reach the backend once the proxy refuses" + ); + + proxy.shutdown().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn set_target_does_not_deadlock_with_live_connection() { + let logctx = crate::dev::test_setup_log( + "set_target_does_not_deadlock_with_live_connection", + ); + let backend = TestBackend::spawn(b"", Persistence::Persistent).await; + + let proxy = RetargetableTcpProxy::bind( + ProxyTarget::Forward(backend.addr()), + &logctx.log, + ) + .await + .expect("bound proxy"); + let proxy_addr = SocketAddr::V6(proxy.local_addr()); + + // Exchange a message so this connection's copy_bidirectional is + // actively running. (If the read guard weren't released before + // copy_bidirectional, it would be held here.) + let mut conn = + TcpStream::connect(proxy_addr).await.expect("connected to proxy"); + conn.write_all(b"ping1").await.expect("wrote first message"); + let mut buf = [0u8; 5]; + conn.read_exact(&mut buf).await.expect("read first echo"); + assert_eq!(&buf, b"ping1"); + + // Attempt to call set_target while the connection is open. If the + // timeout is hit, then the read guard is held for some reason. + const SET_TARGET_TIMEOUT: Duration = Duration::from_secs(10); + tokio::time::timeout( + SET_TARGET_TIMEOUT, + proxy.set_target(ProxyTarget::Refuse), + ) + .await + .expect( + "set_target should complete promptly while a connection is open", + ); + + // The established connection keeps using the old target. + conn.write_all(b"ping2").await.expect("wrote second message"); + conn.read_exact(&mut buf).await.expect("read second echo"); + assert_eq!(&buf, b"ping2"); + + // But new connections are refused. + let mut fresh = + TcpStream::connect(proxy_addr).await.expect("connected to proxy"); + let mut response = Vec::new(); + let error = fresh + .read_to_end(&mut response) + .await + .expect_err("new connection is refused after retarget"); + assert_eq!(error.kind(), std::io::ErrorKind::ConnectionReset); + + proxy.shutdown().await; + logctx.cleanup_successful(); + } +} diff --git a/wicketd/src/nexus_proxy.rs b/wicketd/src/nexus_proxy.rs index df29b8f21e5..09255898508 100644 --- a/wicketd/src/nexus_proxy.rs +++ b/wicketd/src/nexus_proxy.rs @@ -4,6 +4,10 @@ //! TCP proxy to expose Nexus's external API via the techport. +// A similar test-only TCP proxy lives in omicron-test-utils's dev::tcp_proxy. +// If we need this pattern in the future, it might be worth extracting a shared +// implementation. (Or not, since the proxy is quite straightforward.) + use internal_dns_resolver::Resolver; use internal_dns_types::names::ServiceName; use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; From cf9814581729dfe982c68893f9fcc0b5c46dea26 Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 1 Jul 2026 20:48:36 -0700 Subject: [PATCH 2/2] fix build failures, add to flake-patterns.adoc Created using spr 1.3.6-beta.1 --- docs/flake-patterns.adoc | 18 ++++++++++++++++++ nexus/test-utils/src/nexus_test.rs | 2 +- test-utils/src/dev/tcp_proxy.rs | 2 +- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/flake-patterns.adoc b/docs/flake-patterns.adoc index 9454dd8f8da..ee77a5a9b4f 100644 --- a/docs/flake-patterns.adoc +++ b/docs/flake-patterns.adoc @@ -188,3 +188,21 @@ NOTE: The `serial_test` crate **does not work** within Omicron: it depends on an * https://github.com/oxidecomputer/omicron/commit/c9dec4111860[`c9dec4111860`] ensures that learner nodes' ledger file names cannot collide with those of regular peer nodes. (Learner 1 and peer node `pc-b-1` both generated `test-1-network-config-ledger`.) * https://github.com/oxidecomputer/omicron/commit/067c79302158[`067c79302158`] changes a scenario from being a separate test to being listed within `test_replicated`. + +=== Ephemeral port reuse races [[ephemeral-port-reuse-races]] + +**What this is:** A test performs the following sequence of operations: + +. Start a service on an ephemeral TCP port (i.e., bind to port 0). +. Kill the service. +. Restart the service, attempting to bind to the port determined in step 1. + +In between steps 2 and 3, a different process such as a test running concurrently can grab the same port. This can result in test flakiness. + +**Why this is bad:** This is a test flake at best and cross-test interference at worst. + +**How to fix this:** Use the `RetargetableTcpProxy` available at `test-utils/src/dev/tcp-proxy.rs`. That provides a persistent port that stays bound throughout process restarts. + +**Example commits:** + +* https://github.com/oxidecomputer/omicron/commit/AAAAA[`AAAAA`] introduces the TCP proxy. diff --git a/nexus/test-utils/src/nexus_test.rs b/nexus/test-utils/src/nexus_test.rs index 857369d5db9..f027c5a7d66 100644 --- a/nexus/test-utils/src/nexus_test.rs +++ b/nexus/test-utils/src/nexus_test.rs @@ -278,7 +278,7 @@ impl ControlPlaneTestContext { debug!(log, "Restarting dpd behind its proxy"; "port" => port); let prior_dpd_state = dendrite.restart_dpd().await.unwrap(); - debug!(log; "Restarted dpd"; "prior_dpd_state" => ?prior_dpd_state); + debug!(log, "Restarted dpd"; "prior_dpd_state" => ?prior_dpd_state); // Wait for Dendrite to be ready before returning. // We check `switch_identifiers()` rather than just `dpd_uptime()` diff --git a/test-utils/src/dev/tcp_proxy.rs b/test-utils/src/dev/tcp_proxy.rs index 12a641a501a..fe3b8f5bfba 100644 --- a/test-utils/src/dev/tcp_proxy.rs +++ b/test-utils/src/dev/tcp_proxy.rs @@ -9,7 +9,7 @@ //! //! 1. Start a service on an ephemeral port (i.e., bind to port 0). //! 2. Kill the service. -//! 3. Restart the service on the same port. +//! 3. Restart the service, attempting to bind to the port determined in step 1. //! //! The problem is that between steps 2 and 3, it is possible for the port to be //! reused by a different test running concurrently, causing interference