From 0a9a84d40d8ab5bbf1ce80446ac4ca4e009214af Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Thu, 25 Jun 2026 16:36:49 +0800 Subject: [PATCH] feat(core): support conditional rename --- core/core/src/blocking/operator.rs | 28 ++++- .../src/docs/comparisons/vs_object_store.md | 4 +- core/core/src/layers/correctness_check.rs | 34 +++++- core/core/src/raw/ops.rs | 25 ++++- core/core/src/types/capability.rs | 2 + core/core/src/types/operator/operator.rs | 101 ++++++++++++++++-- .../src/types/operator/operator_futures.rs | 29 +++++ core/core/src/types/options.rs | 18 ++++ core/services/hdfs/src/backend.rs | 5 +- core/services/hdfs/src/core.rs | 40 ++++++- core/services/hdfs/src/docs.md | 9 +- core/tests/behavior/async_rename.rs | 88 +++++++++++++++ 12 files changed, 360 insertions(+), 23 deletions(-) diff --git a/core/core/src/blocking/operator.rs b/core/core/src/blocking/operator.rs index ddae26fffdd3..6f5375409bc0 100644 --- a/core/core/src/blocking/operator.rs +++ b/core/core/src/blocking/operator.rs @@ -607,7 +607,7 @@ impl Operator { /// /// - `from` and `to` must be a file. /// - `to` will be overwritten if it exists. - /// - If `from` and `to` are the same, a `IsSameFile` error will occur. + /// - If `from` and `to` are the same, an `IsSameFile` error will occur. /// /// # Examples /// @@ -622,10 +622,34 @@ impl Operator { /// # } /// ``` pub fn rename(&self, from: &str, to: &str) -> Result<()> { + self.rename_options(from, to, options::RenameOptions::default()) + } + + /// Rename a file from `from` to `to` with additional options. + /// + /// # Options + /// + /// Visit [`options::RenameOptions`] for all available options. + /// + /// # Examples + /// + /// ``` + /// use opendal_core::blocking; + /// use opendal_core::options::RenameOptions; + /// use opendal_core::Result; + /// + /// fn test(op: blocking::Operator) -> Result<()> { + /// let mut opts = RenameOptions::default(); + /// opts.if_not_exists = true; + /// op.rename_options("path/to/file", "path/to/file2", opts)?; + /// Ok(()) + /// } + /// ``` + pub fn rename_options(&self, from: &str, to: &str, opts: options::RenameOptions) -> Result<()> { let op = self.op.clone(); let from = from.to_string(); let to = to.to_string(); - self.spawn_block(async move { op.rename(&from, &to).await })? + self.spawn_block(async move { op.rename_options(&from, &to, opts).await })? } /// Delete given path. diff --git a/core/core/src/docs/comparisons/vs_object_store.md b/core/core/src/docs/comparisons/vs_object_store.md index 04162014ccd6..d5d1b8eebe42 100644 --- a/core/core/src/docs/comparisons/vs_object_store.md +++ b/core/core/src/docs/comparisons/vs_object_store.md @@ -145,8 +145,8 @@ opendal has an idea called [`Capability`][crate::Capability], so it's services m | list | list_with_delimiter | - | | - | copy | - | | - | copy_if_not_exists | - | -| - | rename | - | -| - | rename_if_not_exists | - | +| rename | rename | - | +| rename_with(if_not_exists) | rename_if_not_exists | - | | presign | - | get a presign URL of object | | multipart | multipart | both support, but API is different | | blocking | - | opendal supports blocking API | diff --git a/core/core/src/layers/correctness_check.rs b/core/core/src/layers/correctness_check.rs index a9fa50b56add..b2de8ce35a69 100644 --- a/core/core/src/layers/correctness_check.rs +++ b/core/core/src/layers/correctness_check.rs @@ -248,6 +248,16 @@ impl Service for CorrectnessService { to: &str, args: OpRename, ) -> Result { + let capability = self.capability(); + let scheme = self.info().scheme(); + if args.if_not_exists() && !capability.rename_with_if_not_exists { + return Err(new_unsupported_error( + scheme, + Operation::Rename, + "if_not_exists", + )); + } + self.inner.rename(ctx, from, to, args).await } @@ -390,10 +400,7 @@ mod tests { _: &str, _: OpRename, ) -> Result { - Err(Error::new( - ErrorKind::Unsupported, - "operation is not supported", - )) + Ok(RpRename::default()) } async fn presign(&self, _: &OperationContext, _: &str, _: OpPresign) -> Result { @@ -562,4 +569,23 @@ mod tests { let res = op.delete_with("path").version("version").await; assert!(res.is_ok()) } + + #[tokio::test] + async fn test_rename_with_if_not_exists() { + let op = new_test_operator(Capability { + rename: true, + ..Default::default() + }); + let res = op.rename_with("from", "to").if_not_exists(true).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + rename: true, + rename_with_if_not_exists: true, + ..Default::default() + }); + let res = op.rename_with("from", "to").if_not_exists(true).await; + assert!(res.is_ok()); + } } diff --git a/core/core/src/raw/ops.rs b/core/core/src/raw/ops.rs index 5e2d7679c316..5a082c6346fa 100644 --- a/core/core/src/raw/ops.rs +++ b/core/core/src/raw/ops.rs @@ -987,11 +987,32 @@ impl From for (OpCopy, OpCopier) { /// Args for `rename` operation. #[derive(Debug, Clone, Default)] -pub struct OpRename {} +pub struct OpRename { + if_not_exists: bool, +} impl OpRename { - /// Create a new `OpMove`. + /// Create a new `OpRename`. pub fn new() -> Self { Self::default() } + + /// Set the if_not_exists flag for the operation. + pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self { + self.if_not_exists = if_not_exists; + self + } + + /// Get if_not_exists flag. + pub fn if_not_exists(&self) -> bool { + self.if_not_exists + } +} + +impl From for OpRename { + fn from(value: options::RenameOptions) -> Self { + Self { + if_not_exists: value.if_not_exists, + } + } } diff --git a/core/core/src/types/capability.rs b/core/core/src/types/capability.rs index f24064a58b8f..9994f6e5cd58 100644 --- a/core/core/src/types/capability.rs +++ b/core/core/src/types/capability.rs @@ -164,6 +164,8 @@ pub struct Capability { /// Indicates if rename operations are supported. pub rename: bool, + /// Indicates if conditional rename operations with if-not-exists are supported. + pub rename_with_if_not_exists: bool, /// Indicates if list operations are supported. pub list: bool, diff --git a/core/core/src/types/operator/operator.rs b/core/core/src/types/operator/operator.rs index 02e5eb7adc81..14c343bc6df9 100644 --- a/core/core/src/types/operator/operator.rs +++ b/core/core/src/types/operator/operator.rs @@ -1389,24 +1389,111 @@ impl Operator { /// # } /// ``` pub async fn rename(&self, from: &str, to: &str) -> Result<()> { + self.rename_options(from, to, options::RenameOptions::default()) + .await + } + + /// Rename a file from `from` to `to` with additional options. + /// + /// # Notes + /// + /// - `from` and `to` must be a file. + /// - If `from` and `to` are the same, an `IsSameFile` error will occur. + /// + /// # Options + /// + /// Visit [`options::RenameOptions`] for all available options. + /// + /// # Examples + /// + /// ``` + /// use opendal_core::Operator; + /// use opendal_core::Result; + /// + /// async fn test(op: Operator) -> Result<()> { + /// op.rename_with("path/to/file", "path/to/file2") + /// .if_not_exists(true) + /// .await?; + /// Ok(()) + /// } + /// ``` + pub fn rename_with( + &self, + from: &str, + to: &str, + ) -> FutureRename>> { + let from = normalize_path(from); + let to = normalize_path(to); + + OperatorFuture::new( + self.context().clone(), + self.service().clone(), + from, + (options::RenameOptions::default(), to), + Self::rename_inner, + ) + } + + /// Rename a file from `from` to `to` with additional options. + /// + /// # Options + /// + /// Visit [`options::RenameOptions`] for all available options. + /// + /// # Examples + /// + /// ``` + /// use opendal_core::options::RenameOptions; + /// use opendal_core::Operator; + /// use opendal_core::Result; + /// + /// async fn test(op: Operator) -> Result<()> { + /// let mut opts = RenameOptions::default(); + /// opts.if_not_exists = true; + /// op.rename_options("path/to/file", "path/to/file2", opts) + /// .await?; + /// Ok(()) + /// } + /// ``` + pub async fn rename_options( + &self, + from: &str, + to: &str, + opts: impl Into, + ) -> Result<()> { let from = normalize_path(from); + let to = normalize_path(to); + let opts = opts.into(); + Self::rename_inner( + self.context().clone(), + self.service().clone(), + from, + (opts, to), + ) + .await + } + + async fn rename_inner( + ctx: OperationContext, + srv: Servicer, + from: String, + (opts, to): (options::RenameOptions, String), + ) -> Result<()> { if !validate_path(&from, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "from path is a directory") .with_operation(Operation::Rename) - .with_context("service", self.info().scheme()) + .with_context("service", srv.info().scheme()) .with_context("from", from), ); } - let to = normalize_path(to); - if !validate_path(&to, EntryMode::FILE) { return Err( Error::new(ErrorKind::IsADirectory, "to path is a directory") .with_operation(Operation::Rename) - .with_context("service", self.info().scheme()) + .with_context("service", srv.info().scheme()) .with_context("to", to), ); } @@ -1415,15 +1502,13 @@ impl Operator { return Err( Error::new(ErrorKind::IsSameFile, "from and to paths are same") .with_operation(Operation::Rename) - .with_context("service", self.info().scheme()) + .with_context("service", srv.info().scheme()) .with_context("from", from) .with_context("to", to), ); } - self.srv - .rename(&self.ctx, &from, &to, OpRename::new()) - .await?; + srv.rename(&ctx, &from, &to, opts.into()).await?; Ok(()) } diff --git a/core/core/src/types/operator/operator_futures.rs b/core/core/src/types/operator/operator_futures.rs index 3c521ec61c1e..9d01f08c3387 100644 --- a/core/core/src/types/operator/operator_futures.rs +++ b/core/core/src/types/operator/operator_futures.rs @@ -1538,3 +1538,32 @@ impl>> FutureCopier { self } } + +/// Future that generated by [`Operator::rename_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub type FutureRename = OperatorFuture<(options::RenameOptions, String), (), F>; + +impl>> FutureRename { + /// Sets the condition that rename operation will succeed only if target does not exist. + /// + /// Refer to [`options::RenameOptions::if_not_exists`] for more details. + /// + /// ### Example + /// + /// ``` + /// use opendal_core::Operator; + /// use opendal_core::Result; + /// + /// async fn test(op: Operator) -> Result<()> { + /// op.rename_with("source/path", "target/path") + /// .if_not_exists(true) + /// .await?; + /// Ok(()) + /// } + /// ``` + pub fn if_not_exists(mut self, v: bool) -> Self { + self.args.0.if_not_exists = v; + self + } +} diff --git a/core/core/src/types/options.rs b/core/core/src/types/options.rs index c35dd49b79c7..6972225fd629 100644 --- a/core/core/src/types/options.rs +++ b/core/core/src/types/options.rs @@ -601,3 +601,21 @@ pub struct CopyOptions { /// step. Services that cannot split copy operations can ignore it. pub chunk: Option, } + +/// Options for rename operations. +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct RenameOptions { + /// Sets the condition that rename operation will succeed only if target does not exist. + /// + /// ### Capability + /// + /// Check [`Capability::rename_with_if_not_exists`] before using this feature. + /// + /// ### Behavior + /// + /// - If the target does not exist, the rename operation succeeds. + /// - If the target exists, the operation returns [`ErrorKind::ConditionNotMatch`]. + /// - If the service does not support this condition, the operation returns + /// [`ErrorKind::Unsupported`]. + pub if_not_exists: bool, +} diff --git a/core/services/hdfs/src/backend.rs b/core/services/hdfs/src/backend.rs index b7805027f598..71e2b4a5581a 100644 --- a/core/services/hdfs/src/backend.rs +++ b/core/services/hdfs/src/backend.rs @@ -171,6 +171,7 @@ impl Builder for HdfsBuilder { list: true, rename: true, + rename_with_if_not_exists: true, shared: true, @@ -273,9 +274,9 @@ impl Service for HdfsBackend { _ctx: &OperationContext, from: &str, to: &str, - _args: OpRename, + args: OpRename, ) -> Result { - self.core.hdfs_rename(from, to)?; + self.core.hdfs_rename(from, to, &args)?; Ok(RpRename::new()) } diff --git a/core/services/hdfs/src/core.rs b/core/services/hdfs/src/core.rs index 0d8767dca0ce..683acb211e25 100644 --- a/core/services/hdfs/src/core.rs +++ b/core/services/hdfs/src/core.rs @@ -22,6 +22,19 @@ use std::sync::Arc; use opendal_core::raw::*; use opendal_core::*; +fn map_hdfs_rename_error(err: io::Error, if_not_exists: bool, to_path: &str) -> Error { + if if_not_exists && err.kind() == io::ErrorKind::AlreadyExists { + return Error::new( + ErrorKind::ConditionNotMatch, + "target path already exists while if_not_exists is set", + ) + .with_context("input", to_path) + .set_source(err); + } + + new_std_io_error(err) +} + /// HdfsCore contains code that directly interacts with HDFS. #[derive(Clone)] pub struct HdfsCore { @@ -146,7 +159,7 @@ impl HdfsCore { } } - pub fn hdfs_rename(&self, from: &str, to: &str) -> Result<()> { + pub fn hdfs_rename(&self, from: &str, to: &str, args: &OpRename) -> Result<()> { let from_path = build_rooted_abs_path(&self.root, from); self.client.metadata(&from_path).map_err(new_std_io_error)?; @@ -176,6 +189,13 @@ impl HdfsCore { } Ok(metadata) => { if metadata.is_file() { + if args.if_not_exists() { + return Err(Error::new( + ErrorKind::ConditionNotMatch, + "target path already exists while if_not_exists is set", + ) + .with_context("input", &to_path)); + } self.client .remove_file(&to_path) .map_err(new_std_io_error)?; @@ -188,8 +208,24 @@ impl HdfsCore { self.client .rename_file(&from_path, &to_path) - .map_err(new_std_io_error)?; + .map_err(|err| map_hdfs_rename_error(err, args.if_not_exists(), &to_path))?; Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn map_existing_target_error_to_condition_not_match() { + let err = map_hdfs_rename_error( + io::Error::new(io::ErrorKind::AlreadyExists, "target exists"), + true, + "/target", + ); + + assert_eq!(err.kind(), ErrorKind::ConditionNotMatch); + } +} diff --git a/core/services/hdfs/src/docs.md b/core/services/hdfs/src/docs.md index dae3eb5fe635..a08bde2f147e 100644 --- a/core/services/hdfs/src/docs.md +++ b/core/services/hdfs/src/docs.md @@ -32,6 +32,13 @@ HDFS support needs to enable feature `services-hdfs`. Refer to [`HdfsBuilder`]'s public API docs for more information. +### Rename Behavior + +HDFS rename follows OpenDAL's public rename contract and overwrites an existing +target file. Use `Operator::rename_with(...).if_not_exists(true)` when the target +file must not already exist. HDFS returns `ConditionNotMatch` in that case and +leaves both source and target files unchanged. + ## Environment HDFS needs some environment set correctly. @@ -110,7 +117,7 @@ Enabling the vendored feature ensures that hdrs includes the necessary libhdfs.s use opendal_core::Operator; use opendal_service_hdfs::Hdfs; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { // Create fs backend builder. let builder = Hdfs::default() diff --git a/core/tests/behavior/async_rename.rs b/core/tests/behavior/async_rename.rs index 530c0419b6f9..325d3aaa72d6 100644 --- a/core/tests/behavior/async_rename.rs +++ b/core/tests/behavior/async_rename.rs @@ -34,6 +34,14 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_rename_overwrite )) } + + if cap.read && cap.write && cap.rename && cap.rename_with_if_not_exists { + tests.extend(async_trials!( + op, + test_rename_with_if_not_exists, + test_rename_with_if_not_exists_returns_condition_not_match + )) + } } /// Rename a file and test with stat. @@ -206,3 +214,83 @@ pub async fn test_rename_overwrite(op: Operator) -> Result<()> { op.delete(&target_path).await.expect("delete must succeed"); Ok(()) } + +/// Rename to a nonexistent path should succeed when if_not_exists is set. +pub async fn test_rename_with_if_not_exists(op: Operator) -> Result<()> { + let parent = format!("{}/", uuid::Uuid::new_v4()); + let source_path = format!("{parent}source"); + let (source_content, _) = gen_bytes(op.info().capability()); + + op.write(&source_path, source_content.clone()).await?; + + let target_path = format!("{parent}target"); + + op.rename_with(&source_path, &target_path) + .if_not_exists(true) + .await?; + + let err = op.stat(&source_path).await.expect_err("stat must fail"); + assert_eq!(err.kind(), ErrorKind::NotFound); + + let target_content = op + .read(&target_path) + .await + .expect("read must succeed") + .to_bytes(); + assert_eq!( + sha256_digest(target_content), + sha256_digest(&source_content), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + op.delete(&parent).await.expect("delete must succeed"); + Ok(()) +} + +/// Rename to an existing path should return ConditionNotMatch when if_not_exists is set. +pub async fn test_rename_with_if_not_exists_returns_condition_not_match( + op: Operator, +) -> Result<()> { + let parent = format!("{}/", uuid::Uuid::new_v4()); + let source_path = format!("{parent}source"); + let (source_content, _) = gen_bytes(op.info().capability()); + op.write(&source_path, source_content.clone()).await?; + + let target_path = format!("{parent}target"); + let (target_content, _) = gen_bytes(op.info().capability()); + assert_ne!(source_content, target_content); + op.write(&target_path, target_content.clone()).await?; + + let err = op + .rename_with(&source_path, &target_path) + .if_not_exists(true) + .await + .expect_err("rename must fail"); + assert_eq!(err.kind(), ErrorKind::ConditionNotMatch); + + let source_content_after = op + .read(&source_path) + .await + .expect("read source must succeed") + .to_bytes(); + assert_eq!( + sha256_digest(source_content_after), + sha256_digest(&source_content), + ); + + let target_content_after = op + .read(&target_path) + .await + .expect("read target must succeed") + .to_bytes(); + assert_eq!( + sha256_digest(target_content_after), + sha256_digest(&target_content), + ); + + op.delete(&source_path).await.expect("delete must succeed"); + op.delete(&target_path).await.expect("delete must succeed"); + op.delete(&parent).await.expect("delete must succeed"); + Ok(()) +}