speed up UploadPartCopy

(cherry picked from commit db54bf96c7e35851ffbcf3f93fcefb0b9da72000)
This commit is contained in:
Yureka 2025-05-12 19:39:20 +02:00
parent 37e5621dde
commit ffbce0f689

View File

@ -559,6 +559,7 @@ pub async fn handle_upload_part_copy(
let mut current_offset = 0;
let mut next_block = defragmenter.next().await?;
let mut blocks_to_dup = dest_version.clone();
// TODO this could be optimized similarly to read_and_put_blocks
// low priority because uploadpartcopy is rarely used
@ -588,8 +589,7 @@ pub async fn handle_upload_part_copy(
.unwrap()?;
checksummer = checksummer_updated;
dest_version.blocks.clear();
dest_version.blocks.put(
let (version_block_key, version_block) = (
VersionBlockKey {
part_number,
offset: current_offset,
@ -601,37 +601,56 @@ pub async fn handle_upload_part_copy(
);
current_offset += data_len;
let block_ref = BlockRef {
block: final_hash,
version: dest_version_id,
deleted: false.into(),
let next = if let Some(final_data) = data_to_upload {
dest_version.blocks.clear();
dest_version.blocks.put(version_block_key, version_block);
let block_ref = BlockRef {
block: final_hash,
version: dest_version_id,
deleted: false.into(),
};
let (_, _, _, next) = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
garage.block_manager.rpc_put_block(
final_hash,
final_data,
dest_encryption.is_encrypted(),
None
),
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&dest_version),
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to read the next block
defragmenter.next(),
)?;
next
} else {
blocks_to_dup.blocks.put(version_block_key, version_block);
defragmenter.next().await?
};
let (_, _, _, next) = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
async {
if let Some(final_data) = data_to_upload {
garage
.block_manager
.rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
.await
} else {
Ok(())
}
},
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&dest_version),
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to read the next block
defragmenter.next(),
)?;
next_block = next;
}
assert_eq!(current_offset, source_range.length);
// Put the duplicated blocks into the version & block_refs tables
let block_refs_to_put = blocks_to_dup
.blocks
.items()
.iter()
.map(|b| BlockRef {
block: b.1.hash,
version: dest_version_id,
deleted: false.into(),
})
.collect::<Vec<_>>();
futures::try_join!(
garage.version_table.insert(&blocks_to_dup),
garage.block_ref_table.insert_many(&block_refs_to_put[..]),
)?;
let checksums = checksummer.finalize();
let etag = dest_encryption.etag_from_md5(&checksums.md5);
let checksum = checksums.extract(dest_object_checksum_algorithm);