144 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import importlib
 | |
| import importlib.util
 | |
| import subprocess
 | |
| import sys
 | |
| import re
 | |
| import logging
 | |
| 
 | |
| from functools import lru_cache
 | |
| 
 | |
| logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR)  # sshh...
 | |
| logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage())
 | |
| 
 | |
| python = sys.executable
 | |
| git = os.environ.get('GIT', "git")
 | |
| default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1")
 | |
| index_url = os.environ.get('INDEX_URL', "")
 | |
| 
 | |
| modules_path = os.path.dirname(os.path.realpath(__file__))
 | |
| script_path = os.path.dirname(modules_path)
 | |
| dir_repos = "repositories"
 | |
| 
 | |
| fooocus_tag = '1.0.0'
 | |
| 
 | |
| 
 | |
| @lru_cache()
 | |
| def commit_hash():
 | |
|     try:
 | |
|         return subprocess.check_output([git, "rev-parse", "HEAD"], shell=False, encoding='utf8').strip()
 | |
|     except Exception:
 | |
|         return "<none>"
 | |
| 
 | |
| 
 | |
| def git_clone(url, dir, name, commithash=None):
 | |
|     # TODO clone into temporary dir and move if successful
 | |
| 
 | |
|     if os.path.exists(dir):
 | |
|         if commithash is None:
 | |
|             return
 | |
| 
 | |
|         current_hash = run(f'"{git}" -C "{dir}" rev-parse HEAD', None,
 | |
|                            f"Couldn't determine {name}'s hash: {commithash}", live=False).strip()
 | |
|         if current_hash == commithash:
 | |
|             return
 | |
| 
 | |
|         run(f'"{git}" -C "{dir}" fetch', f"Fetching updates for {name}...", f"Couldn't fetch {name}")
 | |
|         run(f'"{git}" -C "{dir}" checkout {commithash}', f"Checking out commit for {name} with hash: {commithash}...",
 | |
|             f"Couldn't checkout commit {commithash} for {name}", live=True)
 | |
|         return
 | |
| 
 | |
|     run(f'"{git}" clone "{url}" "{dir}"', f"Cloning {name} into {dir}...", f"Couldn't clone {name}", live=True)
 | |
| 
 | |
|     if commithash is not None:
 | |
|         run(f'"{git}" -C "{dir}" checkout {commithash}', None, "Couldn't checkout {name}'s hash: {commithash}")
 | |
| 
 | |
| 
 | |
| def repo_dir(name):
 | |
|     return os.path.join(script_path, dir_repos, name)
 | |
| 
 | |
| 
 | |
| def is_installed(package):
 | |
|     try:
 | |
|         spec = importlib.util.find_spec(package)
 | |
|     except ModuleNotFoundError:
 | |
|         return False
 | |
| 
 | |
|     return spec is not None
 | |
| 
 | |
| 
 | |
| def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str:
 | |
|     if desc is not None:
 | |
|         print(desc)
 | |
| 
 | |
|     run_kwargs = {
 | |
|         "args": command,
 | |
|         "shell": True,
 | |
|         "env": os.environ if custom_env is None else custom_env,
 | |
|         "encoding": 'utf8',
 | |
|         "errors": 'ignore',
 | |
|     }
 | |
| 
 | |
|     if not live:
 | |
|         run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE
 | |
| 
 | |
|     result = subprocess.run(**run_kwargs)
 | |
| 
 | |
|     if result.returncode != 0:
 | |
|         error_bits = [
 | |
|             f"{errdesc or 'Error running command'}.",
 | |
|             f"Command: {command}",
 | |
|             f"Error code: {result.returncode}",
 | |
|         ]
 | |
|         if result.stdout:
 | |
|             error_bits.append(f"stdout: {result.stdout}")
 | |
|         if result.stderr:
 | |
|             error_bits.append(f"stderr: {result.stderr}")
 | |
|         raise RuntimeError("\n".join(error_bits))
 | |
| 
 | |
|     return (result.stdout or "")
 | |
| 
 | |
| 
 | |
| def run_pip(command, desc=None, live=default_command_live):
 | |
|     index_url_line = f' --index-url {index_url}' if index_url != '' else ''
 | |
|     return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}",
 | |
|                errdesc=f"Couldn't install {desc}", live=live)
 | |
| 
 | |
| 
 | |
| re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*")
 | |
| 
 | |
| 
 | |
| def requirements_met(requirements_file):
 | |
|     """
 | |
|     Does a simple parse of a requirements.txt file to determine if all rerqirements in it
 | |
|     are already installed. Returns True if so, False if not installed or parsing fails.
 | |
|     """
 | |
| 
 | |
|     import importlib.metadata
 | |
|     import packaging.version
 | |
| 
 | |
|     with open(requirements_file, "r", encoding="utf8") as file:
 | |
|         for line in file:
 | |
|             if line.strip() == "":
 | |
|                 continue
 | |
| 
 | |
|             m = re.match(re_requirement, line)
 | |
|             if m is None:
 | |
|                 return False
 | |
| 
 | |
|             package = m.group(1).strip()
 | |
|             version_required = (m.group(2) or "").strip()
 | |
| 
 | |
|             if version_required == "":
 | |
|                 continue
 | |
| 
 | |
|             try:
 | |
|                 version_installed = importlib.metadata.version(package)
 | |
|             except Exception:
 | |
|                 return False
 | |
| 
 | |
|             if packaging.version.parse(version_required) != packaging.version.parse(version_installed):
 | |
|                 return False
 | |
| 
 | |
|     return True
 |