135 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import importlib
 | 
						|
import importlib.util
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import re
 | 
						|
import logging
 | 
						|
import pygit2
 | 
						|
pygit2.option(pygit2.GIT_OPT_SET_OWNER_VALIDATION, 0)
 | 
						|
 | 
						|
 | 
						|
logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR)  # sshh...
 | 
						|
logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage())
 | 
						|
 | 
						|
python = sys.executable
 | 
						|
default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1")
 | 
						|
index_url = os.environ.get('INDEX_URL', "")
 | 
						|
 | 
						|
modules_path = os.path.dirname(os.path.realpath(__file__))
 | 
						|
script_path = os.path.dirname(modules_path)
 | 
						|
dir_repos = "repositories"
 | 
						|
 | 
						|
 | 
						|
def git_clone(url, dir, name, hash=None):
 | 
						|
    try:
 | 
						|
        try:
 | 
						|
            repo = pygit2.Repository(dir)
 | 
						|
            print(f'{name} exists.')
 | 
						|
        except:
 | 
						|
            if os.path.exists(dir):
 | 
						|
                shutil.rmtree(dir, ignore_errors=True)
 | 
						|
            os.makedirs(dir, exist_ok=True)
 | 
						|
            repo = pygit2.clone_repository(url, dir)
 | 
						|
            print(f'{name} cloned.')
 | 
						|
 | 
						|
        remote = repo.remotes['origin']
 | 
						|
        remote.fetch()
 | 
						|
 | 
						|
        commit = repo.get(hash)
 | 
						|
 | 
						|
        repo.checkout_tree(commit, strategy=pygit2.GIT_CHECKOUT_FORCE)
 | 
						|
        print(f'{name} checkout finished.')
 | 
						|
    except Exception as e:
 | 
						|
        print(f'Git clone failed for {name}: {str(e)}')
 | 
						|
 | 
						|
 | 
						|
def repo_dir(name):
 | 
						|
    return os.path.join(script_path, dir_repos, name)
 | 
						|
 | 
						|
 | 
						|
def is_installed(package):
 | 
						|
    try:
 | 
						|
        spec = importlib.util.find_spec(package)
 | 
						|
    except ModuleNotFoundError:
 | 
						|
        return False
 | 
						|
 | 
						|
    return spec is not None
 | 
						|
 | 
						|
 | 
						|
def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str:
 | 
						|
    if desc is not None:
 | 
						|
        print(desc)
 | 
						|
 | 
						|
    run_kwargs = {
 | 
						|
        "args": command,
 | 
						|
        "shell": True,
 | 
						|
        "env": os.environ if custom_env is None else custom_env,
 | 
						|
        "encoding": 'utf8',
 | 
						|
        "errors": 'ignore',
 | 
						|
    }
 | 
						|
 | 
						|
    if not live:
 | 
						|
        run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE
 | 
						|
 | 
						|
    result = subprocess.run(**run_kwargs)
 | 
						|
 | 
						|
    if result.returncode != 0:
 | 
						|
        error_bits = [
 | 
						|
            f"{errdesc or 'Error running command'}.",
 | 
						|
            f"Command: {command}",
 | 
						|
            f"Error code: {result.returncode}",
 | 
						|
        ]
 | 
						|
        if result.stdout:
 | 
						|
            error_bits.append(f"stdout: {result.stdout}")
 | 
						|
        if result.stderr:
 | 
						|
            error_bits.append(f"stderr: {result.stderr}")
 | 
						|
        raise RuntimeError("\n".join(error_bits))
 | 
						|
 | 
						|
    return (result.stdout or "")
 | 
						|
 | 
						|
 | 
						|
def run_pip(command, desc=None, live=default_command_live):
 | 
						|
    index_url_line = f' --index-url {index_url}' if index_url != '' else ''
 | 
						|
    return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}",
 | 
						|
               errdesc=f"Couldn't install {desc}", live=live)
 | 
						|
 | 
						|
 | 
						|
re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*")
 | 
						|
 | 
						|
 | 
						|
def requirements_met(requirements_file):
 | 
						|
    """
 | 
						|
    Does a simple parse of a requirements.txt file to determine if all rerqirements in it
 | 
						|
    are already installed. Returns True if so, False if not installed or parsing fails.
 | 
						|
    """
 | 
						|
 | 
						|
    import importlib.metadata
 | 
						|
    import packaging.version
 | 
						|
 | 
						|
    with open(requirements_file, "r", encoding="utf8") as file:
 | 
						|
        for line in file:
 | 
						|
            if line.strip() == "":
 | 
						|
                continue
 | 
						|
 | 
						|
            m = re.match(re_requirement, line)
 | 
						|
            if m is None:
 | 
						|
                return False
 | 
						|
 | 
						|
            package = m.group(1).strip()
 | 
						|
            version_required = (m.group(2) or "").strip()
 | 
						|
 | 
						|
            if version_required == "":
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                version_installed = importlib.metadata.version(package)
 | 
						|
            except Exception:
 | 
						|
                return False
 | 
						|
 | 
						|
            if packaging.version.parse(version_required) != packaging.version.parse(version_installed):
 | 
						|
                return False
 | 
						|
 | 
						|
    return True
 |