diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index edda7e0f..969541ad 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -559,6 +559,7 @@ pub async fn handle_upload_part_copy( let mut current_offset = 0; let mut next_block = defragmenter.next().await?; + let mut blocks_to_dup = dest_version.clone(); // TODO this could be optimized similarly to read_and_put_blocks // low priority because uploadpartcopy is rarely used @@ -588,8 +589,7 @@ pub async fn handle_upload_part_copy( .unwrap()?; checksummer = checksummer_updated; - dest_version.blocks.clear(); - dest_version.blocks.put( + let (version_block_key, version_block) = ( VersionBlockKey { part_number, offset: current_offset, @@ -601,37 +601,56 @@ pub async fn handle_upload_part_copy( ); current_offset += data_len; - let block_ref = BlockRef { - block: final_hash, - version: dest_version_id, - deleted: false.into(), + let next = if let Some(final_data) = data_to_upload { + dest_version.blocks.clear(); + dest_version.blocks.put(version_block_key, version_block); + let block_ref = BlockRef { + block: final_hash, + version: dest_version_id, + deleted: false.into(), + }; + let (_, _, _, next) = futures::try_join!( + // Thing 1: if the block is not exactly a block that existed before, + // we need to insert that data as a new block. + garage.block_manager.rpc_put_block( + final_hash, + final_data, + dest_encryption.is_encrypted(), + None + ), + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&dest_version), + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref), + // Thing 4: we need to read the next block + defragmenter.next(), + )?; + next + } else { + blocks_to_dup.blocks.put(version_block_key, version_block); + defragmenter.next().await? }; - - let (_, _, _, next) = futures::try_join!( - // Thing 1: if the block is not exactly a block that existed before, - // we need to insert that data as a new block. - async { - if let Some(final_data) = data_to_upload { - garage - .block_manager - .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None) - .await - } else { - Ok(()) - } - }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&dest_version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), - // Thing 4: we need to read the next block - defragmenter.next(), - )?; next_block = next; } assert_eq!(current_offset, source_range.length); + // Put the duplicated blocks into the version & block_refs tables + let block_refs_to_put = blocks_to_dup + .blocks + .items() + .iter() + .map(|b| BlockRef { + block: b.1.hash, + version: dest_version_id, + deleted: false.into(), + }) + .collect::>(); + futures::try_join!( + garage.version_table.insert(&blocks_to_dup), + garage.block_ref_table.insert_many(&block_refs_to_put[..]), + )?; + let checksums = checksummer.finalize(); let etag = dest_encryption.etag_from_md5(&checksums.md5); let checksum = checksums.extract(dest_object_checksum_algorithm);