pkg-auto: Run package handling in jobs

This spawns some jobs, where each is waiting for messages from main
process. The message can be either a number followed by the number of
packages to handle (a batch) or command to shut down when there is no
more packages left to process. On the other hand, job can send a
message to the main process that it is done with the batch and is
ready for the next one. Any other message is printed on the terminal
by the main process.

After the packages are processed, the main process will collect and
merge the job reports into the main one.

Signed-off-by: Krzesimir Nowak <knowak@microsoft.com>
This commit is contained in:
Krzesimir Nowak 2025-08-20 15:14:08 +02:00
parent 1c3ce92dc5
commit 096a63239e

View File

@ -1985,13 +1985,101 @@ function package_output_paths_unset() {
unset "${@}"
}
# Fields of package job state struct.
#
# PJS_JOB_IDX - name of a job variable
# PJS_DIR_IDX - path to job's state directory
declare -gri PJS_JOB_IDX=0 PJS_DIR_IDX=1
# Declare package job state variables.
#
# Parameters:
#
# @ - names of variables to be used for package job states
function pkg_job_state_declare() {
struct_declare -ga "${@}" "( '' '' )"
}
# Unset package job state variables.
#
# Parameters:
#
# @ - names of package job state variables
function pkg_job_state_unset() {
local name job_name
for name; do
local -n pkg_job_state_ref=${name}
job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
if [[ -n ${job_name} ]]; then
job_unset "${job_name}"
fi
unset -n ref
done
unset "${@}"
}
# Messages used in communication between package jobs and the main
# process.
#
# READYFORMORE is a message that a package job sends to the main
# process when it is done with processing the current batch of
# packages and asks for more.
#
# WEAREDONE is a message that the main process sends to a package job
# when there are no more packages to process, so the job should
# terminate.
declare -gr ready_for_more_msg='READYFORMORE' we_are_done_msg='WEAREDONE'
# A job function for handling package updates. Receives a batch of
# packages to process, processes them, writes results to a given
# directory and asks for more packages when done.
#
# Parameters:
#
# 1 - output directory for the package handling results
# 2 - name of a bunch of maps variable
function handle_package_changes_job() {
local output_dir=${1}; shift
local bunch_of_maps_var_name=${1}; shift
local we_are_done='' line
local -a reply_lines pair
local -i i pkg_count
local REPLY
while [[ -z ${we_are_done} ]]; do
echo "${ready_for_more_msg}"
read -r
if [[ ${REPLY} = "${we_are_done_msg}" ]]; then
we_are_done=x
elif [[ ${REPLY} =~ ^[0-9]+$ ]]; then
reply_lines=()
pkg_count=${REPLY}
for ((i = 0; i < pkg_count; ++i)); do
read -r
reply_lines+=( "${REPLY}" )
done
for line in "${reply_lines[@]}"; do
mapfile -t pair <<<"${line// /$'\n'}"
if [[ ${#pair[@]} -eq 2 ]]; then
handle_one_package_change "${output_dir}" "${bunch_of_maps_var_name}" "${pair[@]}"
else
echo "invalid message received: ${line@Q}, expected a pair of package names"
fi
done
else
echo "invalid message received: ${REPLY@Q}, expected a number or ${we_are_done_msg@Q}"
fi
done
return 0
}
function handle_one_package_change() {
local output_dir=${1}; shift
local -n bunch_of_maps_ref=${1}; shift
local old_name=${1}; shift
local new_name=${1}; shift
local warnings_dir="${output_dir}"
local warnings_dir="${output_dir}/warnings"
local updates_dir="${output_dir}/updates"
local pkg_to_tags_mvm_var_name=${bunch_of_maps_ref[BOM_PKG_TO_TAGS_MVM_IDX]}
@ -2293,8 +2381,6 @@ function handle_package_changes() {
hpc_package_sources_map=()
read_package_sources hpc_package_sources_map
mkdir -p "${REPORTS_DIR}/updates"
local -a old_pkgs new_pkgs
old_pkgs=()
new_pkgs=()
@ -2381,6 +2467,155 @@ function handle_package_changes() {
hpc_bunch_of_maps[BOM_NEW_PKG_SLOT_VERMINMAX_MAP_MVM_IDX]=hpc_new_pkg_slot_verminmax_map_mvm
hpc_bunch_of_maps[BOM_PKG_SOURCES_MAP_IDX]=hpc_package_sources_map
# We will be spawning as many jobs below as there are available
# processors/cores. Each job has its own work directory and will
# be receiving packages to process in batches of five. Once all
# the packages are processed, their reports are aggregated into a
# single one.
local -i pkg_batch_size=5 this_batch_size pkg_idx=0 pkg_count=${#old_pkgs[@]}
local -a pkg_batch old_pkgs_batch new_pkgs_batch
local pkg_job_top_dir="${WORKDIR}/pkgjobdirs"
create_cleanup_dir "${pkg_job_top_dir}"
local -a pkg_job_state_names=()
local pkg_job_state_name pkg_job_name pkg_job_dir pkg_job_warnings_dir pkg_job_updates_dir
local -i job_count i
get_num_proc job_count
local file
local -a paths
# Set up environment for each job, create a job state and kick off
# the job.
for ((i = 0; i < job_count; ++i)); do
gen_varname pkg_job_state_name
pkg_job_state_declare "${pkg_job_state_name}"
gen_varname pkg_job_name
job_declare "${pkg_job_name}"
pkg_job_dir="${pkg_job_top_dir}/j${i}"
pkg_job_warnings_dir="${pkg_job_dir}/warnings"
pkg_job_updates_dir="${pkg_job_dir}/updates"
create_cleanup_dir "${pkg_job_dir}"
create_cleanup_dir "${pkg_job_warnings_dir}"
create_cleanup_dir "${pkg_job_updates_dir}"
paths=()
for file in developer-warnings warnings manual-work-needed; do
paths+=( "${pkg_job_dir}/warnings/${file}" )
done
for file in summary_stubs changelog_stubs; do
paths+=( "${pkg_job_dir}/updates/${file}" )
done
# TODO: That's a bit messy
add_cleanup "find -P ${pkg_job_updates_dir@Q} -mindepth 1 -maxdepth 1 -type d -exec rm -rf {} +"
add_cleanup "rm -f ${paths[*]@Q}"
job_run -m "${pkg_job_name}" handle_package_changes_job "${pkg_job_dir}" hpc_bunch_of_maps
local -n pkg_job_state_ref="${pkg_job_state_name}"
pkg_job_state_ref[PJS_JOB_IDX]=${pkg_job_name}
pkg_job_state_ref[PJS_DIR_IDX]=${pkg_job_dir}
unset -n pkg_job_state_ref
pkg_job_state_names+=( "${pkg_job_state_name}" )
done
# We have two job arrays, "current" and "next". When iterating the
# "current" array, we will be putting all still alive jobs into
# the "next" array, which will become "current" in the next
# iteration. In every iteration we collect the output and send
# another batch of packages to be processed by a job if it's
# ready. We terminate the jobs when we have run out of
# packages. The looping finishes when all the jobs are terminated.
local -i current_idx=0 next_idx=1 idx state_count=${#pkg_job_state_names[@]}
local -a pkg_job_state_names_0=( "${pkg_job_state_names[@]}" ) pkg_job_state_names_1=() pkg_job_output_lines
local pkg_job_output_line pkg_job_input_sent
while [[ state_count -gt 0 ]]; do
local -n pkg_job_state_names_ref=pkg_job_state_names_${current_idx}
local -n next_pkg_job_state_names_ref=pkg_job_state_names_${next_idx}
next_pkg_job_state_names_ref=()
for pkg_job_state_name in "${pkg_job_state_names_ref[@]}"; do
local -n pkg_job_state_ref=${pkg_job_state_name}
pkg_job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
unset -n pkg_job_state_ref
if job_is_alive "${pkg_job_name}"; then
next_pkg_job_state_names_ref+=( "${pkg_job_state_name}" )
fi
job_get_output "${pkg_job_name}" pkg_job_output_lines
pkg_job_input_sent=
for pkg_job_output_line in "${pkg_job_output_lines[@]}"; do
if [[ ${pkg_job_output_line} = "${ready_for_more_msg}" ]]; then
if [[ -z ${pkg_job_input_sent} ]]; then
if [[ pkg_idx -ge pkg_count ]]; then
job_send_input "${pkg_job_name}" "${we_are_done_msg}"
else
old_pkgs_batch=( "${old_pkgs[@]:pkg_idx:pkg_batch_size}" )
new_pkgs_batch=( "${new_pkgs[@]:pkg_idx:pkg_batch_size}" )
this_batch_size=${#old_pkgs_batch[@]}
pkg_batch=( "${this_batch_size}" )
for ((i = 0; i < this_batch_size; ++i)); do
old_pkg=${old_pkgs_batch[i]}
new_pkg=${new_pkgs_batch[i]}
pkg_batch+=( "${old_pkg} ${new_pkg}" )
done
pkg_idx=$((pkg_idx + pkg_batch_size))
job_send_input "${pkg_job_name}" "${pkg_batch[@]}"
fi
pkg_job_input_sent=x
fi
else
# The job already used info to print this line, so
# we should just echo it, otherwise we will get
# repeated prefixes ("script_name: script_name:
# something happenend")
echo "${pkg_job_output_line}"
fi
done
done
state_count=${#next_pkg_job_state_names_ref[@]}
if [[ state_count -gt 0 ]]; then
sleep 0.2
fi
unset -n pkg_job_state_names_ref next_pkg_job_state_names_ref
idx=${current_idx}
current_idx=${next_idx}
next_idx=${idx}
done
# All the jobs are done, so here we collect all their reports and
# merge them into the main ones in reports directory.
local some_job_failed='' hpc_filename
local -i hpc_rv
truncate --size=0 "${REPORTS_DIR}/updates/summary_stubs" "${REPORTS_DIR}/updates/changelog_stubs"
for pkg_job_state_name in "${pkg_job_state_names[@]}"; do
local -n pkg_job_state_ref=${pkg_job_state_name}
pkg_job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
pkg_job_dir=${pkg_job_state_ref[PJS_DIR_IDX]}
unset -n pkg_job_state_ref
job_reap "${pkg_job_name}" hpc_rv
if [[ hpc_rv -ne 0 ]]; then
some_job_failed=x
fi
for file in "${pkg_job_dir}/warnings/"*; do
basename_out "${file}" hpc_filename
cat "${file}" >>"${REPORTS_DIR}/${hpc_filename}"
done
for file in "${pkg_job_dir}/updates/"*; do
basename_out "${file}" hpc_filename
if [[ -f ${file} ]]; then
cat "${file}" >>"${REPORTS_DIR}/updates/${hpc_filename}"
elif [[ -d ${file} ]]; then
if [[ ! -d "${REPORTS_DIR}/updates/${hpc_filename}" ]]; then
mkdir -p "${REPORTS_DIR}/updates/${hpc_filename}"
fi
mv "${file}/"* "${REPORTS_DIR}/updates/${hpc_filename}"
fi
done
done
# The loop below goes over the pairs of old and new package
# names. For each name there will be some checks done (like does
# this package even exist). Each name in the pair has a set of
@ -2398,20 +2633,16 @@ function handle_package_changes() {
# handled by the automation - in such cases there will be a
# "manual action needed" report.
local pkg_idx=0
local old_name new_name
while [[ ${pkg_idx} -lt ${#old_pkgs[@]} ]]; do
old_name=${old_pkgs["${pkg_idx}"]}
new_name=${new_pkgs["${pkg_idx}"]}
handle_one_package_change "${REPORTS_DIR}" hpc_bunch_of_maps "${old_name}" "${new_name}"
done
pkg_job_state_unset "${pkg_job_state_names[@]}"
bunch_of_maps_unset hpc_bunch_of_maps
mvm_unset hpc_new_pkg_slot_verminmax_map_mvm
mvm_unset hpc_old_pkg_slot_verminmax_map_mvm
mvm_unset hpc_pkg_slots_set_mvm
if [[ -n ${some_job_failed} ]]; then
fail "some job failed"
fi
}
# Gets the first item from the passed set.
@ -3035,17 +3266,19 @@ function handle_gentoo_sync() {
mvm_declare hgs_pkg_to_tags_mvm
process_listings hgs_pkg_to_tags_mvm
# shellcheck source=for-shellcheck/globals
source "${WORKDIR}/globals"
local -A hgs_renames_old_to_new_map=()
process_profile_updates_directory hgs_renames_old_to_new_map
mkdir -p "${REPORTS_DIR}/updates"
handle_package_changes hgs_renames_old_to_new_map hgs_pkg_to_tags_mvm
mvm_unset hgs_pkg_to_tags_mvm
#mvm_debug_disable hgs_pkg_to_tags_mvm
# shellcheck source=for-shellcheck/globals
source "${WORKDIR}/globals"
local old_head new_head
old_head=$(git -C "${OLD_STATE}" rev-parse HEAD)
new_head=$(git -C "${NEW_STATE}" rev-parse HEAD)