diff --git a/video2x/upscaler.py b/video2x/upscaler.py index 8e84eac..57e8bad 100755 --- a/video2x/upscaler.py +++ b/video2x/upscaler.py @@ -19,189 +19,174 @@ along with this program. If not, see . Name: Upscaler Author: K4YT3X Date Created: May 27, 2021 -Last Modified: March 20, 2022 +Last Modified: April 10, 2022 """ import math -import multiprocessing import queue -import signal -import time -from multiprocessing.managers import ListProxy -from multiprocessing.sharedctypes import Synchronized -from loguru import logger -from PIL import Image, ImageChops, ImageStat +from PIL import Image from realcugan_ncnn_vulkan_python import Realcugan from realsr_ncnn_vulkan_python import Realsr from srmd_ncnn_vulkan_python import Srmd from waifu2x_ncnn_vulkan_python import Waifu2x -# fixed scaling ratios supported by the algorithms -# that only support certain fixed scale ratios -ALGORITHM_FIXED_SCALING_RATIOS = { - "waifu2x": [1, 2], - "srmd": [2, 3, 4], - "realsr": [4], - "realcugan": [1, 2, 3, 4], -} - -ALGORITHM_CLASSES = { - "waifu2x": Waifu2x, - "srmd": Srmd, - "realsr": Realsr, - "realcugan": Realcugan, -} +from .processor import Processor -class Upscaler(multiprocessing.Process): - def __init__( +class Upscaler: + # fixed scaling ratios supported by the algorithms + # that only support certain fixed scale ratios + ALGORITHM_FIXED_SCALING_RATIOS = { + "waifu2x": [1, 2], + "srmd": [2, 3, 4], + "realsr": [4], + "realcugan": [1, 2, 3, 4], + } + + ALGORITHM_CLASSES = { + "waifu2x": Waifu2x, + "srmd": Srmd, + "realsr": Realsr, + "realcugan": Realcugan, + } + + processor_objects = {} + + @staticmethod + def _get_scaling_tasks( + input_width: int, + input_height: int, + output_width: int, + output_height: int, + algorithm: str, + ) -> list: + """ + Get the required tasks for upscaling the image until it is larger than + or equal to the desired output dimensions. For example, SRMD only supports + 2x, 3x, and 4x, so upsclaing an image from 320x240 to 3840x2160 will + require the SRMD to run 3x then 4x. In this case, this function will + return [3, 4]. + + :param input_width int: input image width + :param input_height int: input image height + :param output_width int: desired output image width + :param output_height int: desired output image size + :param algorithm str: upsclaing algorithm + :rtype list: the list of upsclaing tasks required + """ + # calculate required minimum scale ratio + output_scale = max(output_width / input_width, output_height / input_height) + + # select the optimal algorithm scaling ratio to use + supported_scaling_ratios = sorted( + Upscaler.ALGORITHM_FIXED_SCALING_RATIOS[algorithm] + ) + + remaining_scaling_ratio = math.ceil(output_scale) + + # if the scaling ratio is 1.0 + # apply the smallest scaling ratio available + if remaining_scaling_ratio == 1: + return [supported_scaling_ratios[0]] + + scaling_jobs = [] + while remaining_scaling_ratio > 1: + for ratio in supported_scaling_ratios: + if ratio >= remaining_scaling_ratio: + scaling_jobs.append(ratio) + remaining_scaling_ratio /= ratio + break + + else: + found = False + for i in supported_scaling_ratios: + for j in supported_scaling_ratios: + if i * j >= remaining_scaling_ratio: + scaling_jobs.extend([i, j]) + remaining_scaling_ratio /= i * j + found = True + break + if found is True: + break + + if found is False: + scaling_jobs.append(supported_scaling_ratios[-1]) + remaining_scaling_ratio /= supported_scaling_ratios[-1] + return scaling_jobs + + def upscale_image( self, - processing_queue: multiprocessing.Queue, - processed_frames: ListProxy, - pause: Synchronized, - ) -> None: - multiprocessing.Process.__init__(self) - self.running = False - self.processing_queue = processing_queue - self.processed_frames = processed_frames - self.pause = pause + image: Image.Image, + output_width: int, + output_height: int, + algorithm: str, + noise: int, + ) -> Image.Image: + """ + upscale an image - signal.signal(signal.SIGTERM, self._stop) + :param image Image.Image: the image to upscale + :param output_width int: the desired output width + :param output_height int: the desired output height + :param algorithm str: the algorithm to use + :param noise int: the noise level (available only for some algorithms) + :rtype Image.Image: the upscaled image + """ + width, height = image.size - def run(self) -> None: - self.running = True - logger.opt(colors=True).info( - f"Upscaler process {self.name} initiating" - ) - processor_objects = {} - while self.running is True: - try: - # pause if pause flag is set - if self.pause.value is True: - time.sleep(0.1) - continue + for task in self._get_scaling_tasks( + width, height, output_width, output_height, algorithm + ): - try: - # get new job from queue - ( - frame_index, - (image0, image1), - ( - output_width, - output_height, - noise, - difference_threshold, - algorithm, - ), - ) = self.processing_queue.get(False) + # select a processor object with the required settings + # create a new object if none are available + processor_object = self.processor_objects.get((algorithm, task)) + if processor_object is None: + processor_object = self.ALGORITHM_CLASSES[algorithm]( + noise=noise, scale=task + ) + self.processor_objects[(algorithm, task)] = processor_object - # destructure settings - except queue.Empty: - time.sleep(0.1) - continue + # process the image with the selected algorithm + image = processor_object.process(image) - difference_ratio = 0 - if image0 is not None: - difference = ImageChops.difference(image0, image1) - difference_stat = ImageStat.Stat(difference) - difference_ratio = ( - sum(difference_stat.mean) - / (len(difference_stat.mean) * 255) - * 100 - ) + # downscale the image to the desired output size and + # save the image to disk + return image.resize((output_width, output_height), Image.Resampling.LANCZOS) - # if the difference is lower than threshold - # skip this frame - if difference_ratio < difference_threshold: - # make sure the previous frame has been processed - if frame_index > 0: - while self.processed_frames[frame_index - 1] is None: - time.sleep(0.1) +class UpscalerProcessor(Processor, Upscaler): + def process(self) -> None: - # make the current image the same as the previous result - self.processed_frames[frame_index] = self.processed_frames[ - frame_index - 1 - ] + task = self.tasks_queue.get() + while task is not None: - # if the difference is greater than threshold - # process this frame - else: - width, height = image1.size + # unpack the task's values + ( + frame_index, + previous_frame, + current_frame, + processing_settings, + ) = task - # calculate required minimum scale ratio - output_scale = max(output_width / width, output_height / height) + # calculate the %diff between the current frame and the previous frame + difference_ratio = 0 + if previous_frame is not None: + difference_ratio = self.get_image_diff(previous_frame, current_frame) - # select the optimal algorithm scaling ratio to use - supported_scaling_ratios = sorted( - ALGORITHM_FIXED_SCALING_RATIOS[algorithm] - ) + # if the difference is lower than threshold, skip this frame + if difference_ratio < processing_settings["difference_threshold"]: - remaining_scaling_ratio = math.ceil(output_scale) - scaling_jobs = [] + # make the current image the same as the previous result + self.processed_frames[frame_index] = True - # if the scaling ratio is 1.0 - # apply the smallest scaling ratio available - if remaining_scaling_ratio == 1: - scaling_jobs.append(supported_scaling_ratios[0]) - else: - while remaining_scaling_ratio > 1: - for ratio in supported_scaling_ratios: - if ratio >= remaining_scaling_ratio: - scaling_jobs.append(ratio) - remaining_scaling_ratio /= ratio - break + # if the difference is greater than threshold + # process this frame + else: + self.processed_frames[frame_index] = self.upscale_image( + **processing_settings + ) - else: - found = False - for i in supported_scaling_ratios: - for j in supported_scaling_ratios: - if i * j >= remaining_scaling_ratio: - scaling_jobs.extend([i, j]) - remaining_scaling_ratio /= i * j - found = True - break - if found is True: - break - - if found is False: - scaling_jobs.append(supported_scaling_ratios[-1]) - remaining_scaling_ratio /= supported_scaling_ratios[ - -1 - ] - - for job in scaling_jobs: - - # select a processor object with the required settings - # create a new object if none are available - processor_object = processor_objects.get((algorithm, job)) - if processor_object is None: - processor_object = ALGORITHM_CLASSES[algorithm]( - noise=noise, scale=job - ) - processor_objects[(algorithm, job)] = processor_object - - # process the image with the selected algorithm - image1 = processor_object.process(image1) - - # downscale the image to the desired output size and - # save the image to disk - image1 = image1.resize((output_width, output_height), Image.LANCZOS) - self.processed_frames[frame_index] = image1 - - # send exceptions into the client connection pipe - except (SystemExit, KeyboardInterrupt): - break - - except Exception as error: - logger.exception(error) - break - - logger.opt(colors=True).info( - f"Upscaler process {self.name} terminating" - ) - return super().run() - - def _stop(self, _signal_number, _frame) -> None: - self.running = False + self.tasks_queue.task_done() + task = self.tasks_queue.get()