155 lines
4.5 KiB
Python
155 lines
4.5 KiB
Python
import os
|
|
import importlib
|
|
import importlib.util
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import re
|
|
import logging
|
|
import pygit2
|
|
|
|
|
|
pygit2.option(pygit2.GIT_OPT_SET_OWNER_VALIDATION, 0)
|
|
|
|
logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR) # sshh...
|
|
logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage())
|
|
|
|
re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*")
|
|
|
|
python = sys.executable
|
|
default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1")
|
|
index_url = os.environ.get('INDEX_URL', "")
|
|
|
|
modules_path = os.path.dirname(os.path.realpath(__file__))
|
|
script_path = os.path.dirname(modules_path)
|
|
dir_repos = "repositories"
|
|
|
|
|
|
def onerror(func, path, exc_info):
|
|
import stat
|
|
if not os.access(path, os.W_OK):
|
|
os.chmod(path, stat.S_IWUSR)
|
|
func(path)
|
|
else:
|
|
raise 'Failed to invoke "shutil.rmtree", git management failed.'
|
|
|
|
|
|
def git_clone(url, dir, name, hash=None):
|
|
try:
|
|
try:
|
|
repo = pygit2.Repository(dir)
|
|
remote_url = repo.remotes['origin'].url
|
|
if remote_url != url:
|
|
print(f'{name} exists but remote URL will be updated.')
|
|
del repo
|
|
raise url
|
|
else:
|
|
print(f'{name} exists and URL is correct.')
|
|
except:
|
|
if os.path.isdir(dir) or os.path.exists(dir):
|
|
shutil.rmtree(dir, onerror=onerror)
|
|
os.makedirs(dir, exist_ok=True)
|
|
repo = pygit2.clone_repository(url, dir)
|
|
print(f'{name} cloned from {url}.')
|
|
|
|
if hash is not None:
|
|
remote = repo.remotes['origin']
|
|
remote.fetch()
|
|
commit = repo.get(hash)
|
|
repo.checkout_tree(commit, strategy=pygit2.GIT_CHECKOUT_FORCE)
|
|
repo.set_head(commit.id)
|
|
print(f'{name} checkout finished for {hash}.')
|
|
except Exception as e:
|
|
print(f'Git clone failed for {name}: {str(e)}')
|
|
|
|
|
|
def repo_dir(name):
|
|
return os.path.join(script_path, dir_repos, name)
|
|
|
|
|
|
def is_installed(package):
|
|
try:
|
|
spec = importlib.util.find_spec(package)
|
|
except ModuleNotFoundError:
|
|
return False
|
|
|
|
return spec is not None
|
|
|
|
|
|
def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str:
|
|
if desc is not None:
|
|
print(desc)
|
|
|
|
run_kwargs = {
|
|
"args": command,
|
|
"shell": True,
|
|
"env": os.environ if custom_env is None else custom_env,
|
|
"encoding": 'utf8',
|
|
"errors": 'ignore',
|
|
}
|
|
|
|
if not live:
|
|
run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE
|
|
|
|
result = subprocess.run(**run_kwargs)
|
|
|
|
if result.returncode != 0:
|
|
error_bits = [
|
|
f"{errdesc or 'Error running command'}.",
|
|
f"Command: {command}",
|
|
f"Error code: {result.returncode}",
|
|
]
|
|
if result.stdout:
|
|
error_bits.append(f"stdout: {result.stdout}")
|
|
if result.stderr:
|
|
error_bits.append(f"stderr: {result.stderr}")
|
|
raise RuntimeError("\n".join(error_bits))
|
|
|
|
return (result.stdout or "")
|
|
|
|
|
|
def run_pip(command, desc=None, live=default_command_live):
|
|
try:
|
|
index_url_line = f' --index-url {index_url}' if index_url != '' else ''
|
|
return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}",
|
|
errdesc=f"Couldn't install {desc}", live=live)
|
|
except Exception as e:
|
|
print(e)
|
|
print(f'CMD Failed {desc}: {command}')
|
|
return None
|
|
|
|
|
|
def requirements_met(requirements_file):
|
|
"""
|
|
Does a simple parse of a requirements.txt file to determine if all rerqirements in it
|
|
are already installed. Returns True if so, False if not installed or parsing fails.
|
|
"""
|
|
|
|
import importlib.metadata
|
|
import packaging.version
|
|
|
|
with open(requirements_file, "r", encoding="utf8") as file:
|
|
for line in file:
|
|
if line.strip() == "":
|
|
continue
|
|
|
|
m = re.match(re_requirement, line)
|
|
if m is None:
|
|
return False
|
|
|
|
package = m.group(1).strip()
|
|
version_required = (m.group(2) or "").strip()
|
|
|
|
if version_required == "":
|
|
continue
|
|
|
|
try:
|
|
version_installed = importlib.metadata.version(package)
|
|
except Exception:
|
|
return False
|
|
|
|
if packaging.version.parse(version_required) != packaging.version.parse(version_installed):
|
|
return False
|
|
|
|
return True
|