u-boot/tools/patman/cseries.py
Simon Glass ce785fd9b7 patman: Add a Cseries class
This is the main class for dealing with series, across branches and the
database.

Signed-off-by: Simon Glass <sjg@chromium.org>
2025-05-27 10:07:42 +01:00

1166 lines
44 KiB
Python

# SPDX-License-Identifier: GPL-2.0+
#
# Copyright 2025 Google LLC
#
"""Handles the 'series' subcommand
"""
import asyncio
from collections import OrderedDict, defaultdict
import pygit2
from u_boot_pylib import cros_subprocess
from u_boot_pylib import gitutil
from u_boot_pylib import terminal
from u_boot_pylib import tout
from patman import patchstream
from patman import cser_helper
from patman.cser_helper import AUTOLINK, oid
from patman import send
from patman import status
class Cseries(cser_helper.CseriesHelper):
"""Database with information about series
This class handles database read/write as well as operations in a git
directory to update series information.
"""
def __init__(self, topdir=None, colour=terminal.COLOR_IF_TERMINAL):
"""Set up a new Cseries
Args:
topdir (str): Top-level directory of the repo
colour (terminal.enum): Whether to enable ANSI colour or not
"""
super().__init__(topdir, colour)
def add(self, branch_name, desc=None, mark=False, allow_unmarked=False,
end=None, force_version=False, dry_run=False):
"""Add a series (or new version of a series) to the database
Args:
branch_name (str): Name of branch to sync, or None for current one
desc (str): Description to use, or None to use the series subject
mark (str): True to mark each commit with a change ID
allow_unmarked (str): True to not require each commit to be marked
end (str): Add only commits up to but exclu
force_version (bool): True if ignore a Series-version tag that
doesn't match its branch name
dry_run (bool): True to do a dry run
"""
name, ser, version, msg = self.prep_series(branch_name, end)
tout.info(f"Adding series '{ser.name}' v{version}: mark {mark} "
f'allow_unmarked {allow_unmarked}')
if msg:
tout.info(msg)
if desc is None:
if not ser.cover:
raise ValueError(f"Branch '{name}' has no cover letter - "
'please provide description')
desc = ser['cover'][0]
ser = self._handle_mark(name, ser, version, mark, allow_unmarked,
force_version, dry_run)
link = ser.get_link_for_version(version)
msg = 'Added'
added = False
series_id = self.db.series_find_by_name(ser.name)
if not series_id:
series_id = self.db.series_add(ser.name, desc)
added = True
msg += f" series '{ser.name}'"
if version not in self._get_version_list(series_id):
svid = self.db.ser_ver_add(series_id, version, link)
msg += f" v{version}"
if not added:
msg += f" to existing series '{ser.name}'"
added = True
self._add_series_commits(ser, svid)
count = len(ser.commits)
msg += f" ({count} commit{'s' if count > 1 else ''})"
if not added:
tout.info(f"Series '{ser.name}' v{version} already exists")
msg = None
elif not dry_run:
self.commit()
else:
self.rollback()
series_id = None
ser.desc = desc
ser.idnum = series_id
if msg:
tout.info(msg)
if dry_run:
tout.info('Dry run completed')
def decrement(self, series, dry_run=False):
"""Decrement a series to the previous version and delete the branch
Args:
series (str): Name of series to use, or None to use current branch
dry_run (bool): True to do a dry run
"""
ser = self._parse_series(series)
if not ser.idnum:
raise ValueError(f"Series '{ser.name}' not found in database")
max_vers = self._series_max_version(ser.idnum)
if max_vers < 2:
raise ValueError(f"Series '{ser.name}' only has one version")
tout.info(f"Removing series '{ser.name}' v{max_vers}")
new_max = max_vers - 1
repo = pygit2.init_repository(self.gitdir)
if not dry_run:
name = self._get_branch_name(ser.name, new_max)
branch = repo.lookup_branch(name)
try:
repo.checkout(branch)
except pygit2.errors.GitError:
tout.warning(f"Failed to checkout branch {name}")
raise
del_name = f'{ser.name}{max_vers}'
del_branch = repo.lookup_branch(del_name)
branch_oid = del_branch.peel(pygit2.enums.ObjectType.COMMIT).oid
del_branch.delete()
print(f"Deleted branch '{del_name}' {oid(branch_oid)}")
self.db.ser_ver_remove(ser.idnum, max_vers)
if not dry_run:
self.commit()
else:
self.rollback()
def increment(self, series_name, dry_run=False):
"""Increment a series to the next version and create a new branch
Args:
series_name (str): Name of series to use, or None to use current
branch
dry_run (bool): True to do a dry run
"""
ser = self._parse_series(series_name)
if not ser.idnum:
raise ValueError(f"Series '{ser.name}' not found in database")
max_vers = self._series_max_version(ser.idnum)
branch_name = self._get_branch_name(ser.name, max_vers)
on_branch = gitutil.get_branch(self.gitdir) == branch_name
svid = self.get_series_svid(ser.idnum, max_vers)
pwc = self.get_pcommit_dict(svid)
count = len(pwc.values())
series = patchstream.get_metadata(branch_name, 0, count,
git_dir=self.gitdir)
tout.info(f"Increment '{ser.name}' v{max_vers}: {count} patches")
# Create a new branch
vers = max_vers + 1
new_name = self._join_name_version(ser.name, vers)
self.update_series(branch_name, series, max_vers, new_name, dry_run,
add_vers=vers, switch=on_branch)
old_svid = self.get_series_svid(ser.idnum, max_vers)
pcd = self.get_pcommit_dict(old_svid)
svid = self.db.ser_ver_add(ser.idnum, vers)
self.db.pcommit_add_list(svid, pcd.values())
if not dry_run:
self.commit()
else:
self.rollback()
# repo.head.set_target(amended)
tout.info(f'Added new branch {new_name}')
if dry_run:
tout.info('Dry run completed')
def link_set(self, series_name, version, link, update_commit):
"""Add / update a series-links link for a series
Args:
series_name (str): Name of series to use, or None to use current
branch
version (int): Version number, or None to detect from name
link (str): Patchwork link-string for the series
update_commit (bool): True to update the current commit with the
link
"""
ser, version = self._parse_series_and_version(series_name, version)
self._ensure_version(ser, version)
self._set_link(ser.idnum, ser.name, version, link, update_commit)
self.commit()
tout.info(f"Setting link for series '{ser.name}' v{version} to {link}")
def link_get(self, series, version):
"""Get the patchwork link for a version of a series
Args:
series (str): Name of series to use, or None to use current branch
version (int): Version number or None for current
Return:
str: Patchwork link as a string, e.g. '12325'
"""
ser, version = self._parse_series_and_version(series, version)
self._ensure_version(ser, version)
return self.db.ser_ver_get_link(ser.idnum, version)
def link_search(self, pwork, series, version):
"""Search patch for the link for a series
Returns either the single match, or None, in which case the second part
of the tuple is filled in
Args:
pwork (Patchwork): Patchwork object to use
series (str): Series name to search for, or None for current series
that is checked out
version (int): Version to search for, or None for current version
detected from branch name
Returns:
tuple:
int: ID of the series found, or None
list of possible matches, or None, each a dict:
'id': series ID
'name': series name
str: series name
int: series version
str: series description
"""
_, ser, version, _, _, _, _, _ = self._get_patches(series, version)
if not ser.desc:
raise ValueError(f"Series '{ser.name}' has an empty description")
pws, options = self.loop.run_until_complete(pwork.find_series(
ser, version))
return pws, options, ser.name, version, ser.desc
def link_auto(self, pwork, series, version, update_commit, wait_s=0):
"""Automatically find a series link by looking in patchwork
Args:
pwork (Patchwork): Patchwork object to use
series (str): Series name to search for, or None for current series
that is checked out
version (int): Version to search for, or None for current version
detected from branch name
update_commit (bool): True to update the current commit with the
link
wait_s (int): Number of seconds to wait for the autolink to succeed
"""
start = self.get_time()
stop = start + wait_s
sleep_time = 5
while True:
pws, options, name, version, desc = self.link_search(
pwork, series, version)
if pws:
if wait_s:
tout.info('Link completed after '
f'{self.get_time() - start} seconds')
break
print(f"Possible matches for '{name}' v{version} desc '{desc}':")
print(' Link Version Description')
for opt in options:
print(f"{opt['id']:6} {opt['version']:7} {opt['name']}")
if not wait_s or self.get_time() > stop:
delay = f' after {wait_s} seconds' if wait_s else ''
raise ValueError(f"Cannot find series '{desc}{delay}'")
self.sleep(sleep_time)
self.link_set(name, version, pws, update_commit)
def link_auto_all(self, pwork, update_commit, link_all_versions,
replace_existing, dry_run, show_summary=True):
"""Automatically find a series link by looking in patchwork
Args:
pwork (Patchwork): Patchwork object to use
update_commit (bool): True to update the current commit with the
link
link_all_versions (bool): True to sync all versions of a series,
False to sync only the latest version
replace_existing (bool): True to sync a series even if it already
has a link
dry_run (bool): True to do a dry run
show_summary (bool): True to show a summary of how things went
Return:
OrderedDict of summary info:
key (int): ser_ver ID
value (AUTOLINK): result of autolinking on this ser_ver
"""
sdict = self.db.series_get_dict_by_id()
all_ser_vers = self._get_autolink_dict(sdict, link_all_versions)
# Get rid of things without a description
valid = {}
state = {}
no_desc = 0
not_found = 0
updated = 0
failed = 0
already = 0
for svid, (ser_id, name, version, link, desc) in all_ser_vers.items():
if link and not replace_existing:
state[svid] = f'already:{link}'
already += 1
elif desc:
valid[svid] = ser_id, version, link, desc
else:
no_desc += 1
state[svid] = 'missing description'
results, requests = self.loop.run_until_complete(
pwork.find_series_list(valid))
for svid, ser_id, link, _ in results:
if link:
version = all_ser_vers[svid][2]
if self._set_link(ser_id, sdict[ser_id].name, version,
link, update_commit, dry_run=dry_run):
updated += 1
state[svid] = f'linked:{link}'
else:
failed += 1
state[svid] = 'failed'
else:
not_found += 1
state[svid] = 'not found'
# Create a summary sorted by name and version
summary = OrderedDict()
for svid in sorted(all_ser_vers, key=lambda k: all_ser_vers[k][1:2]):
_, name, version, link, ser = all_ser_vers[svid]
summary[svid] = AUTOLINK(name, version, link, ser.desc,
state[svid])
if show_summary:
msg = f'{updated} series linked'
if already:
msg += f', {already} already linked'
if not_found:
msg += f', {not_found} not found'
if no_desc:
msg += f', {no_desc} missing description'
if failed:
msg += f', {failed} updated failed'
tout.info(msg + f' ({requests} requests)')
tout.info('')
tout.info(f"{'Name':15} Version {'Description':40} Result")
border = f"{'-' * 15} ------- {'-' * 40} {'-' * 15}"
tout.info(border)
for name, version, link, desc, state in summary.values():
bright = True
if state.startswith('already'):
col = self.col.GREEN
bright = False
elif state.startswith('linked'):
col = self.col.MAGENTA
else:
col = self.col.RED
col_state = self.col.build(col, state, bright)
tout.info(f"{name:16.16} {version:7} {desc or '':40.40} "
f'{col_state}')
tout.info(border)
if dry_run:
tout.info('Dry run completed')
return summary
def series_list(self):
"""List all series
Lines all series along with their description, number of patches
accepted and the available versions
"""
sdict = self.db.series_get_dict()
print(f"{'Name':15} {'Description':40} Accepted Versions")
border = f"{'-' * 15} {'-' * 40} -------- {'-' * 15}"
print(border)
for name in sorted(sdict):
ser = sdict[name]
versions = self._get_version_list(ser.idnum)
stat = self._series_get_version_stats(
ser.idnum, self._series_max_version(ser.idnum))[0]
vlist = ' '.join([str(ver) for ver in sorted(versions)])
print(f'{name:16.16} {ser.desc:41.41} {stat.rjust(8)} {vlist}')
print(border)
def list_patches(self, series, version, show_commit=False,
show_patch=False):
"""List patches in a series
Args:
series (str): Name of series to use, or None to use current branch
version (int): Version number, or None to detect from name
show_commit (bool): True to show the commit and diffstate
show_patch (bool): True to show the patch
"""
branch, series, version, pwc, name, _, cover_id, num_comments = (
self._get_patches(series, version))
with terminal.pager():
state_totals = defaultdict(int)
self._list_patches(branch, pwc, series, name, cover_id,
num_comments, show_commit, show_patch, True,
state_totals)
def mark(self, in_name, allow_marked=False, dry_run=False):
"""Add Change-Id tags to a series
Args:
in_name (str): Name of the series to unmark
allow_marked (bool): Allow commits to be (already) marked
dry_run (bool): True to do a dry run, restoring the original tree
afterwards
Return:
pygit.oid: oid of the new branch
"""
name, ser, _, _ = self.prep_series(in_name)
tout.info(f"Marking series '{name}': allow_marked {allow_marked}")
if not allow_marked:
bad = []
for cmt in ser.commits:
if cmt.change_id:
bad.append(cmt)
if bad:
print(f'{len(bad)} commit(s) already have marks')
for cmt in bad:
print(f' - {oid(cmt.hash)} {cmt.subject}')
raise ValueError(
f'Marked commits {len(bad)}/{len(ser.commits)}')
new_oid = self._mark_series(in_name, ser, dry_run=dry_run)
if dry_run:
tout.info('Dry run completed')
return new_oid
def unmark(self, name, allow_unmarked=False, dry_run=False):
"""Remove Change-Id tags from a series
Args:
name (str): Name of the series to unmark
allow_unmarked (bool): Allow commits to be (already) unmarked
dry_run (bool): True to do a dry run, restoring the original tree
afterwards
Return:
pygit.oid: oid of the new branch
"""
name, ser, _, _ = self.prep_series(name)
tout.info(
f"Unmarking series '{name}': allow_unmarked {allow_unmarked}")
if not allow_unmarked:
bad = []
for cmt in ser.commits:
if not cmt.change_id:
bad.append(cmt)
if bad:
print(f'{len(bad)} commit(s) are missing marks')
for cmt in bad:
print(f' - {oid(cmt.hash)} {cmt.subject}')
raise ValueError(
f'Unmarked commits {len(bad)}/{len(ser.commits)}')
vals = None
for vals in self.process_series(name, ser, dry_run=dry_run):
if cser_helper.CHANGE_ID_TAG in vals.msg:
lines = vals.msg.splitlines()
updated = [line for line in lines
if not line.startswith(cser_helper.CHANGE_ID_TAG)]
vals.msg = '\n'.join(updated)
tout.detail(" - removing mark")
vals.info = 'unmarked'
else:
vals.info = 'no mark'
if dry_run:
tout.info('Dry run completed')
return vals.oid
def open(self, pwork, name, version):
"""Open the patchwork page for a series
Args:
pwork (Patchwork): Patchwork object to use
name (str): Name of series to open
version (str): Version number to open
"""
ser, version = self._parse_series_and_version(name, version)
link = self.link_get(ser.name, version)
pwork.url = 'https://patchwork.ozlabs.org'
url = self.loop.run_until_complete(pwork.get_series_url(link))
print(f'Opening {url}')
# With Firefox, GTK produces lots of warnings, so suppress them
# Gtk-Message: 06:48:20.692: Failed to load module "xapp-gtk3-module"
# Gtk-Message: 06:48:20.692: Not loading module "atk-bridge": The
# functionality is provided by GTK natively. Please try to not load it.
# Gtk-Message: 06:48:20.692: Failed to load module "appmenu-gtk-module"
# Gtk-Message: 06:48:20.692: Failed to load module "appmenu-gtk-module"
# [262145, Main Thread] WARNING: GTK+ module /snap/firefox/5987/
# gnome-platform/usr/lib/gtk-2.0/modules/libcanberra-gtk-module.so
# cannot be loaded.
# GTK+ 2.x symbols detected. Using GTK+ 2.x and GTK+ 3 in the same
# process # is not supported.: 'glib warning', file /build/firefox/
# parts/firefox/build/toolkit/xre/nsSigHandlers.cpp:201
#
# (firefox_firefox:262145): Gtk-WARNING **: 06:48:20.728: GTK+ module
# /snap/firefox/5987/gnome-platform/usr/lib/gtk-2.0/modules/
# libcanberra-gtk-module.so cannot be loaded.
# GTK+ 2.x symbols detected. Using GTK+ 2.x and GTK+ 3 in the same
# process is not supported.
# Gtk-Message: 06:48:20.728: Failed to load module
# "canberra-gtk-module"
# [262145, Main Thread] WARNING: GTK+ module /snap/firefox/5987/
# gnome-platform/usr/lib/gtk-2.0/modules/libcanberra-gtk-module.so
# cannot be loaded.
# GTK+ 2.x symbols detected. Using GTK+ 2.x and GTK+ 3 in the same
# process is not supported.: 'glib warning', file /build/firefox/
# parts/firefox/build/toolkit/xre/nsSigHandlers.cpp:201
#
# (firefox_firefox:262145): Gtk-WARNING **: 06:48:20.729: GTK+ module
# /snap/firefox/5987/gnome-platform/usr/lib/gtk-2.0/modules/
# libcanberra-gtk-module.so cannot be loaded.
# GTK+ 2.x symbols detected. Using GTK+ 2.x and GTK+ 3 in the same
# process is not supported.
# Gtk-Message: 06:48:20.729: Failed to load module
# "canberra-gtk-module"
# ATTENTION: default value of option mesa_glthread overridden by
# environment.
cros_subprocess.Popen(['xdg-open', url])
def progress(self, series, show_all_versions, list_patches):
"""Show progress information for all versions in a series
Args:
series (str): Name of series to use, or None to show progress for
all series
show_all_versions (bool): True to show all versions of a series,
False to show only the final version
list_patches (bool): True to list all patches for each series,
False to just show the series summary on a single line
"""
with terminal.pager():
state_totals = defaultdict(int)
if series is not None:
_, _, need_scan = self._progress_one(
self._parse_series(series), show_all_versions,
list_patches, state_totals)
if need_scan:
tout.warning(
'Inconsistent commit-subject: Please use '
"'patman series -s <branch> scan' to resolve this")
return
total_patches = 0
total_series = 0
sdict = self.db.series_get_dict()
border = None
total_need_scan = 0
if not list_patches:
print(self.col.build(
self.col.MAGENTA,
f"{'Name':16} {'Description':41} Count {'Status'}"))
border = f"{'-' * 15} {'-' * 40} ----- {'-' * 15}"
print(border)
for name in sorted(sdict):
ser = sdict[name]
num_series, num_patches, need_scan = self._progress_one(
ser, show_all_versions, list_patches, state_totals)
total_need_scan += need_scan
if list_patches:
print()
total_series += num_series
total_patches += num_patches
if not list_patches:
print(border)
total = f'{total_series} series'
out = ''
for state, freq in state_totals.items():
out += ' ' + self._build_col(state, f'{freq}:')[0]
if total_need_scan:
out = '*' + out[1:]
print(f"{total:15} {'':40} {total_patches:5} {out}")
if total_need_scan:
tout.info(
f'Series marked * ({total_need_scan}) have commit '
'subjects which mismatch their patches and need to be '
'scanned')
def project_set(self, pwork, name, quiet=False):
"""Set the name of the project
Args:
pwork (Patchwork): Patchwork object to use
name (str): Name of the project to use in patchwork
quiet (bool): True to skip writing the message
"""
res = self.loop.run_until_complete(pwork.get_projects())
proj_id = None
link_name = None
for proj in res:
if proj['name'] == name:
proj_id = proj['id']
link_name = proj['link_name']
if not proj_id:
raise ValueError(f"Unknown project name '{name}'")
self.db.settings_update(name, proj_id, link_name)
self.commit()
if not quiet:
tout.info(f"Project '{name}' patchwork-ID {proj_id} "
f'link-name {link_name}')
def project_get(self):
"""Get the details of the project
Returns:
tuple or None if there are no settings:
name (str): Project name, e.g. 'U-Boot'
proj_id (int): Patchworks project ID for this project
link_name (str): Patchwork's link-name for the project
"""
return self.db.settings_get()
def remove(self, name, dry_run=False):
"""Remove a series from the database
Args:
name (str): Name of series to remove, or None to use current one
dry_run (bool): True to do a dry run
"""
ser = self._parse_series(name)
name = ser.name
if not ser.idnum:
raise ValueError(f"No such series '{name}'")
self.db.ser_ver_remove(ser.idnum, None)
if not dry_run:
self.commit()
else:
self.rollback()
self.commit()
tout.info(f"Removed series '{name}'")
if dry_run:
tout.info('Dry run completed')
def rename(self, series, name, dry_run=False):
"""Rename a series
Renames a series and changes the name of any branches which match
versions present in the database
Args:
series (str): Name of series to use, or None to use current branch
name (str): new name to use (must not include version number)
dry_run (bool): True to do a dry run
"""
old_ser, _ = self._parse_series_and_version(series, None)
if not old_ser.idnum:
raise ValueError(f"Series '{old_ser.name}' not found in database")
if old_ser.name != series:
raise ValueError(f"Invalid series name '{series}': "
'did you use the branch name?')
chk, _ = cser_helper.split_name_version(name)
if chk != name:
raise ValueError(
f"Invalid series name '{name}': did you use the branch name?")
if chk == old_ser.name:
raise ValueError(
f"Cannot rename series '{old_ser.name}' to itself")
if self.get_series_by_name(name):
raise ValueError(f"Cannot rename: series '{name}' already exists")
versions = self._get_version_list(old_ser.idnum)
missing = []
exists = []
todo = {}
for ver in versions:
ok = True
old_branch = self._get_branch_name(old_ser.name, ver)
if not gitutil.check_branch(old_branch, self.gitdir):
missing.append(old_branch)
ok = False
branch = self._get_branch_name(name, ver)
if gitutil.check_branch(branch, self.gitdir):
exists.append(branch)
ok = False
if ok:
todo[ver] = [old_branch, branch]
if missing or exists:
msg = 'Cannot rename'
if missing:
msg += f": branches missing: {', '.join(missing)}"
if exists:
msg += f": branches exist: {', '.join(exists)}"
raise ValueError(msg)
for old_branch, branch in todo.values():
tout.info(f"Renaming branch '{old_branch}' to '{branch}'")
if not dry_run:
gitutil.rename_branch(old_branch, branch, self.gitdir)
# Change the series name; nothing needs to change in ser_ver
self.db.series_set_name(old_ser.idnum, name)
if not dry_run:
self.commit()
else:
self.rollback()
tout.info(f"Renamed series '{series}' to '{name}'")
if dry_run:
tout.info('Dry run completed')
def scan(self, branch_name, mark=False, allow_unmarked=False, end=None,
dry_run=False):
"""Scan a branch and make updates to the database if it has changed
Args:
branch_name (str): Name of branch to sync, or None for current one
mark (str): True to mark each commit with a change ID
allow_unmarked (str): True to not require each commit to be marked
end (str): Add only commits up to but exclu
dry_run (bool): True to do a dry run
"""
def _show_item(oper, seq, subject):
col = None
if oper == '+':
col = self.col.GREEN
elif oper == '-':
col = self.col.RED
out = self.col.build(col, subject) if col else subject
tout.info(f'{oper} {seq:3} {out}')
name, ser, version, msg = self.prep_series(branch_name, end)
svid = self.get_ser_ver(ser.idnum, version).idnum
pcdict = self.get_pcommit_dict(svid)
tout.info(
f"Syncing series '{name}' v{version}: mark {mark} "
f'allow_unmarked {allow_unmarked}')
if msg:
tout.info(msg)
ser = self._handle_mark(name, ser, version, mark, allow_unmarked,
False, dry_run)
# First check for new patches that are not in the database
to_add = dict(enumerate(ser.commits))
for pcm in pcdict.values():
tout.debug(f'pcm {pcm.subject}')
i = self._find_matched_commit(to_add, pcm)
if i is not None:
del to_add[i]
# Now check for patches in the database that are not in the branch
to_remove = dict(enumerate(pcdict.values()))
for cmt in ser.commits:
tout.debug(f'cmt {cmt.subject}')
i = self._find_matched_patch(to_remove, cmt)
if i is not None:
del to_remove[i]
for seq, cmt in enumerate(ser.commits):
if seq in to_remove:
_show_item('-', seq, to_remove[seq].subject)
del to_remove[seq]
if seq in to_add:
_show_item('+', seq, to_add[seq].subject)
del to_add[seq]
else:
_show_item(' ', seq, cmt.subject)
seq = len(ser.commits)
for cmt in to_add.items():
_show_item('+', seq, cmt.subject)
seq += 1
for seq, pcm in to_remove.items():
_show_item('+', seq, pcm.subject)
self.db.pcommit_delete(svid)
self._add_series_commits(ser, svid)
if not dry_run:
self.commit()
else:
self.rollback()
tout.info('Dry run completed')
def send(self, pwork, name, autolink, autolink_wait, args):
"""Send out a series
Args:
pwork (Patchwork): Patchwork object to use
name (str): Series name to search for, or None for current series
that is checked out
autolink (bool): True to auto-link the series after sending
args (argparse.Namespace): 'send' arguments provided
autolink_wait (int): Number of seconds to wait for the autolink to
succeed
"""
ser, version = self._parse_series_and_version(name, None)
if not ser.idnum:
raise ValueError(f"Series '{ser.name}' not found in database")
args.branch = self._get_branch_name(ser.name, version)
likely_sent = send.send(args, git_dir=self.gitdir, cwd=self.topdir)
if likely_sent and autolink:
print(f'Autolinking with Patchwork ({autolink_wait} seconds)')
self.link_auto(pwork, name, version, True, wait_s=autolink_wait)
def archive(self, series):
"""Archive a series
Args:
series (str): Name of series to use, or None to use current branch
"""
ser = self._parse_series(series, include_archived=True)
if not ser.idnum:
raise ValueError(f"Series '{ser.name}' not found in database")
svlist = self.db.ser_ver_get_for_series(ser.idnum)
# Figure out the tags we will create
tag_info = {}
now = self.get_now()
now_str = now.strftime('%d%b%y').lower()
for svi in svlist:
name = self._get_branch_name(ser.name, svi.version)
if not gitutil.check_branch(name, git_dir=self.gitdir):
raise ValueError(f"No branch named '{name}'")
tag_info[svi.version] = [svi.idnum, name, f'{name}-{now_str}']
# Create the tags
repo = pygit2.init_repository(self.gitdir)
for _, (idnum, name, tag_name) in tag_info.items():
commit = repo.revparse_single(name)
repo.create_tag(tag_name, commit.hex,
pygit2.enums.ObjectType.COMMIT,
commit.author, commit.message)
# Update the database
for idnum, name, tag_name in tag_info.values():
self.db.ser_ver_set_archive_tag(idnum, tag_name)
# Delete the branches
for idnum, name, tag_name in tag_info.values():
# Detach HEAD from the branch if pointing to this branch
commit = repo.revparse_single(name)
if repo.head.target == commit.oid:
repo.set_head(commit.oid)
repo.branches.delete(name)
self.db.series_set_archived(ser.idnum, True)
self.commit()
def unarchive(self, series):
"""Unarchive a series
Args:
series (str): Name of series to use, or None to use current branch
"""
ser = self._parse_series(series, include_archived=True)
if not ser.idnum:
raise ValueError(f"Series '{ser.name}' not found in database")
self.db.series_set_archived(ser.idnum, False)
svlist = self.db.ser_ver_get_for_series(ser.idnum)
# Collect the tags
repo = pygit2.init_repository(self.gitdir)
tag_info = {}
for svi in svlist:
name = self._get_branch_name(ser.name, svi.version)
target = repo.revparse_single(svi.archive_tag)
tag_info[svi.idnum] = name, svi.archive_tag, target
# Make sure the branches don't exist
for name, tag_name, tag in tag_info.values():
if name in repo.branches:
raise ValueError(
f"Cannot restore branch '{name}': already exists")
# Recreate the branches
for name, tag_name, tag in tag_info.values():
target = repo.get(tag.target)
repo.branches.create(name, target)
# Delete the tags
for name, tag_name, tag in tag_info.values():
repo.references.delete(f'refs/tags/{tag_name}')
# Update the database
for idnum, (name, tag_name, tag) in tag_info.items():
self.db.ser_ver_set_archive_tag(idnum, None)
self.commit()
def status(self, pwork, series, version, show_comments,
show_cover_comments=False):
"""Show the series status from patchwork
Args:
pwork (Patchwork): Patchwork object to use
series (str): Name of series to use, or None to use current branch
version (int): Version number, or None to detect from name
show_comments (bool): Show all comments on each patch
show_cover_comments (bool): Show all comments on the cover letter
"""
branch, series, version, _, _, link, _, _ = self._get_patches(
series, version)
if not link:
raise ValueError(
f"Series '{series.name}' v{version} has no patchwork link: "
f"Try 'patman series -s {branch} autolink'")
status.check_and_show_status(
series, link, branch, None, False, show_comments,
show_cover_comments, pwork, self.gitdir)
def summary(self, series):
"""Show summary information for all series
Args:
series (str): Name of series to use
"""
print(f"{'Name':17} Status Description")
print(f"{'-' * 17} {'-' * 6} {'-' * 30}")
if series is not None:
self._summary_one(self._parse_series(series))
return
sdict = self.db.series_get_dict()
for ser in sdict.values():
self._summary_one(ser)
def gather(self, pwork, series, version, show_comments,
show_cover_comments, gather_tags, dry_run=False):
"""Gather any new tags from Patchwork, optionally showing comments
Args:
pwork (Patchwork): Patchwork object to use
series (str): Name of series to use, or None to use current branch
version (int): Version number, or None to detect from name
show_comments (bool): True to show the comments on each patch
show_cover_comments (bool): True to show the comments on the cover
letter
gather_tags (bool): True to gather review/test tags
dry_run (bool): True to do a dry run (database is not updated)
"""
ser, version = self._parse_series_and_version(series, version)
self._ensure_version(ser, version)
svid, link = self._get_series_svid_link(ser.idnum, version)
if not link:
raise ValueError(
"No patchwork link is available: use 'patman series autolink'")
tout.info(
f"Updating series '{ser.name}' version {version} "
f"from link '{link}'")
loop = asyncio.get_event_loop()
with pwork.collect_stats() as stats:
cover, patches = loop.run_until_complete(self._gather(
pwork, link, show_cover_comments))
with terminal.pager():
updated, updated_cover = self._sync_one(
svid, ser.name, version, show_comments, show_cover_comments,
gather_tags, cover, patches, dry_run)
tout.info(f"{updated} patch{'es' if updated != 1 else ''}"
f"{' and cover letter' if updated_cover else ''} "
f'updated ({stats.request_count} requests)')
if not dry_run:
self.commit()
else:
self.rollback()
tout.info('Dry run completed')
def gather_all(self, pwork, show_comments, show_cover_comments,
sync_all_versions, gather_tags, dry_run=False):
to_fetch, missing = self._get_fetch_dict(sync_all_versions)
loop = asyncio.get_event_loop()
result, requests = loop.run_until_complete(self._do_series_sync_all(
pwork, to_fetch))
with terminal.pager():
tot_updated = 0
tot_cover = 0
add_newline = False
for (svid, sync), (cover, patches) in zip(to_fetch.items(),
result):
if add_newline:
tout.info('')
tout.info(f"Syncing '{sync.series_name}' v{sync.version}")
updated, updated_cover = self._sync_one(
svid, sync.series_name, sync.version, show_comments,
show_cover_comments, gather_tags, cover, patches, dry_run)
tot_updated += updated
tot_cover += updated_cover
add_newline = gather_tags
tout.info('')
tout.info(
f"{tot_updated} patch{'es' if tot_updated != 1 else ''} and "
f"{tot_cover} cover letter{'s' if tot_cover != 1 else ''} "
f'updated, {missing} missing '
f"link{'s' if missing != 1 else ''} ({requests} requests)")
if not dry_run:
self.commit()
else:
self.rollback()
tout.info('Dry run completed')
def upstream_add(self, name, url):
"""Add a new upstream tree
Args:
name (str): Name of the tree
url (str): URL for the tree
"""
self.db.upstream_add(name, url)
self.commit()
def upstream_list(self):
"""List the upstream repos
Shows a list of the repos, obtained from the database
"""
udict = self.get_upstream_dict()
for name, items in udict.items():
url, is_default = items
default = 'default' if is_default else ''
print(f'{name:15.15} {default:8} {url}')
def upstream_set_default(self, name):
"""Set the default upstream target
Args:
name (str): Name of the upstream remote to set as default, or None
for none
"""
self.db.upstream_set_default(name)
self.commit()
def upstream_get_default(self):
"""Get the default upstream target
Return:
str: Name of the upstream remote to set as default, or None if none
"""
return self.db.upstream_get_default()
def upstream_delete(self, name):
"""Delete an upstream target
Args:
name (str): Name of the upstream remote to delete
"""
self.db.upstream_delete(name)
self.commit()
def version_remove(self, name, version, dry_run=False):
"""Remove a version of a series from the database
Args:
name (str): Name of series to remove, or None to use current one
version (int): Version number to remove
dry_run (bool): True to do a dry run
"""
ser, version = self._parse_series_and_version(name, version)
name = ser.name
versions = self._ensure_version(ser, version)
if versions == [version]:
raise ValueError(
f"Series '{ser.name}' only has one version: remove the series")
self.db.ser_ver_remove(ser.idnum, version)
if not dry_run:
self.commit()
else:
self.rollback()
tout.info(f"Removed version {version} from series '{name}'")
if dry_run:
tout.info('Dry run completed')
def version_change(self, name, version, new_version, dry_run=False):
"""Change a version of a series to be a different version
Args:
name (str): Name of series to remove, or None to use current one
version (int): Version number to change
new_version (int): New version
dry_run (bool): True to do a dry run
"""
ser, version = self._parse_series_and_version(name, version)
name = ser.name
versions = self._ensure_version(ser, version)
vstr = list(map(str, versions))
if version not in versions:
raise ValueError(
f"Series '{ser.name}' does not have v{version}: "
f"{' '.join(vstr)}")
if not new_version:
raise ValueError('Please provide a new version number')
if new_version in versions:
raise ValueError(
f"Series '{ser.name}' already has a v{new_version}: "
f"{' '.join(vstr)}")
new_name = self._join_name_version(ser.name, new_version)
svid = self.get_series_svid(ser.idnum, version)
pwc = self.get_pcommit_dict(svid)
count = len(pwc.values())
series = patchstream.get_metadata(name, 0, count, git_dir=self.gitdir)
self.update_series(name, series, version, new_name, dry_run,
add_vers=new_version, switch=True)
self.db.ser_ver_set_version(svid, new_version)
if not dry_run:
self.commit()
else:
self.rollback()
tout.info(f"Changed version {version} in series '{ser.name}' "
f"to {new_version} named '{new_name}'")
if dry_run:
tout.info('Dry run completed')