mirror of
https://github.com/k4yt3x/video2x.git
synced 2025-11-15 03:21:00 +01:00
96 lines
3.0 KiB
Python
Executable File
96 lines
3.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Copyright (C) 2018-2023 K4YT3X and contributors.
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as
|
|
published by the Free Software Foundation, either version 3 of the
|
|
License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
Name: Interpolator
|
|
Author: K4YT3X
|
|
Date Created: May 27, 2021
|
|
Last Modified: March 20, 2022
|
|
"""
|
|
|
|
import time
|
|
|
|
from loguru import logger
|
|
from PIL import ImageChops, ImageStat
|
|
from rife_ncnn_vulkan_python.rife_ncnn_vulkan import Rife
|
|
|
|
from .processor import Processor
|
|
|
|
|
|
class Interpolator:
|
|
ALGORITHM_CLASSES = {"rife": Rife}
|
|
|
|
processor_objects = {}
|
|
|
|
def interpolate_image(self, image0, image1, difference_threshold, algorithm):
|
|
difference = ImageChops.difference(image0, image1)
|
|
difference_stat = ImageStat.Stat(difference)
|
|
difference_ratio = (
|
|
sum(difference_stat.mean) / (len(difference_stat.mean) * 255) * 100
|
|
)
|
|
|
|
if difference_ratio < difference_threshold:
|
|
processor_object = self.processor_objects.get(algorithm)
|
|
if processor_object is None:
|
|
processor_object = self.ALGORITHM_CLASSES[algorithm](0)
|
|
self.processor_objects[algorithm] = processor_object
|
|
interpolated_image = processor_object.process(image0, image1)
|
|
|
|
else:
|
|
interpolated_image = image0
|
|
|
|
return interpolated_image
|
|
|
|
|
|
class InterpolatorProcessor(Processor, Interpolator):
|
|
def process(self) -> None:
|
|
task = self.tasks_queue.get()
|
|
while task is not None:
|
|
try:
|
|
if self.pause_flag.value is True:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
(
|
|
frame_index,
|
|
image0,
|
|
image1,
|
|
(difference_threshold, algorithm),
|
|
) = task
|
|
|
|
if image0 is None:
|
|
task = self.tasks_queue.get()
|
|
continue
|
|
|
|
interpolated_image = self.interpolate_image(
|
|
image0, image1, difference_threshold, algorithm
|
|
)
|
|
|
|
if frame_index == 1:
|
|
self.processed_frames[0] = image0
|
|
self.processed_frames[frame_index * 2 - 1] = interpolated_image
|
|
self.processed_frames[frame_index * 2] = image1
|
|
|
|
task = self.tasks_queue.get()
|
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
break
|
|
|
|
except Exception as error:
|
|
logger.exception(error)
|
|
break
|