From fc8fc60f6dae85c70a6350fdcfd560f024656c0e Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 30 May 2025 16:24:12 +0000 Subject: [PATCH] emit internal error when we detect race condition (#1053) (fix #1050) i went with a `500`/`InternalError`/`Please try again.` because that is something i've seen AWS S3 report while developing other software, and i'm not convinced all clients would understand a 409 conflict properly (GET don't usually conflict) Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/1053 Co-authored-by: trinity-1686a Co-committed-by: trinity-1686a --- src/api/s3/copy.rs | 4 +++- src/api/s3/get.rs | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 969541ad..47a63c82 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -26,7 +26,7 @@ use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; use crate::encryption::EncryptionParams; use crate::error::*; -use crate::get::{full_object_byte_stream, PreconditionHeaders}; +use crate::get::{check_version_not_deleted, full_object_byte_stream, PreconditionHeaders}; use crate::multipart; use crate::put::{extract_metadata_headers, save_stream, ChecksumMode, SaveStreamResult}; use crate::website::X_AMZ_WEBSITE_REDIRECT_LOCATION; @@ -237,6 +237,7 @@ async fn handle_copy_metaonly( .get(&source_version.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NoSuchKey)?; + check_version_not_deleted(&source_version)?; // Write an "uploading" marker in Object table // This holds a reference to the object in the Version table @@ -428,6 +429,7 @@ pub async fn handle_upload_part_copy( .get(&source_object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; + check_version_not_deleted(&source_version)?; // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 22076603..888a040a 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -19,12 +19,13 @@ use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; -use garage_util::error::OkOrMessage; +use garage_util::error::{Error as UtilError, OkOrMessage}; use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use garage_api_common::common_error::CommonError; use garage_api_common::helpers::*; use garage_api_common::signature::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE}; @@ -215,6 +216,7 @@ pub async fn handle_head_without_ctx( .get(&object_version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; + check_version_not_deleted(&version)?; let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; @@ -365,6 +367,21 @@ pub async fn handle_get_without_ctx( } } +pub(crate) fn check_version_not_deleted(version: &Version) -> Result<(), Error> { + if version.deleted.get() { + // the version was deleted between when the object_table was consulted + // and now, this could mean the object was deleted, or overriden. + // Rather than say the key doesn't exist, return a transient error + // to signal the client to try again. + return Err(CommonError::InternalError(UtilError::Message( + "conflict/inconsistency between object and version state, version is deleted" + .to_string(), + )) + .into()); + } + Ok(()) +} + async fn handle_get_full( garage: Arc, version: &ObjectVersion, @@ -431,6 +448,7 @@ pub fn full_object_byte_stream( .ok_or_message("channel closed")?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; + check_version_not_deleted(&version)?; for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { let stream_block_i = encryption .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64))) @@ -446,6 +464,14 @@ pub fn full_object_byte_stream( { Ok(()) => (), Err(e) => { + // TODO i think this is a bad idea, we should log + // an error and stop there. If the error happens to + // be exactly the size of what hasn't been streamed + // yet, the client will see the request as a + // success + // instead truncating the output notify the client + // something happened with their download, so that + // they can retry it let _ = tx.send(error_stream_item(e)).await; } } @@ -497,7 +523,7 @@ async fn handle_get_range( .get(&version.uuid, &EmptyKey) .await? .ok_or(Error::NoSuchKey)?; - + check_version_not_deleted(&version)?; let body = body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end); Ok(resp_builder.body(body)?) @@ -548,6 +574,8 @@ async fn handle_get_part( .await? .ok_or(Error::NoSuchKey)?; + check_version_not_deleted(&version)?; + let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;