1807 lines
		
	
	
		
			66 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1807 lines
		
	
	
		
			66 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import torch
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import json
 | |
| import hashlib
 | |
| import traceback
 | |
| import math
 | |
| import time
 | |
| import random
 | |
| 
 | |
| from PIL import Image, ImageOps
 | |
| from PIL.PngImagePlugin import PngInfo
 | |
| import numpy as np
 | |
| import safetensors.torch
 | |
| 
 | |
| sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "fcbh"))
 | |
| 
 | |
| 
 | |
| import fcbh.diffusers_load
 | |
| import fcbh.samplers
 | |
| import fcbh.sample
 | |
| import fcbh.sd
 | |
| import fcbh.utils
 | |
| import fcbh.controlnet
 | |
| 
 | |
| import fcbh.clip_vision
 | |
| 
 | |
| import fcbh.model_management
 | |
| from fcbh.cli_args import args
 | |
| 
 | |
| import importlib
 | |
| 
 | |
| import folder_paths
 | |
| import latent_preview
 | |
| 
 | |
| def before_node_execution():
 | |
|     fcbh.model_management.throw_exception_if_processing_interrupted()
 | |
| 
 | |
| def interrupt_processing(value=True):
 | |
|     fcbh.model_management.interrupt_current_processing(value)
 | |
| 
 | |
| MAX_RESOLUTION=8192
 | |
| 
 | |
| class CLIPTextEncode:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"text": ("STRING", {"multiline": True}), "clip": ("CLIP", )}}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "encode"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def encode(self, clip, text):
 | |
|         tokens = clip.tokenize(text)
 | |
|         cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True)
 | |
|         return ([[cond, {"pooled_output": pooled}]], )
 | |
| 
 | |
| class ConditioningCombine:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "combine"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def combine(self, conditioning_1, conditioning_2):
 | |
|         return (conditioning_1 + conditioning_2, )
 | |
| 
 | |
| class ConditioningAverage :
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ),
 | |
|                               "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "addWeighted"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
 | |
|         out = []
 | |
| 
 | |
|         if len(conditioning_from) > 1:
 | |
|             print("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
 | |
| 
 | |
|         cond_from = conditioning_from[0][0]
 | |
|         pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
 | |
| 
 | |
|         for i in range(len(conditioning_to)):
 | |
|             t1 = conditioning_to[i][0]
 | |
|             pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
 | |
|             t0 = cond_from[:,:t1.shape[1]]
 | |
|             if t0.shape[1] < t1.shape[1]:
 | |
|                 t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
 | |
| 
 | |
|             tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
 | |
|             t_to = conditioning_to[i][1].copy()
 | |
|             if pooled_output_from is not None and pooled_output_to is not None:
 | |
|                 t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
 | |
|             elif pooled_output_from is not None:
 | |
|                 t_to["pooled_output"] = pooled_output_from
 | |
| 
 | |
|             n = [tw, t_to]
 | |
|             out.append(n)
 | |
|         return (out, )
 | |
| 
 | |
| class ConditioningConcat:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {
 | |
|             "conditioning_to": ("CONDITIONING",),
 | |
|             "conditioning_from": ("CONDITIONING",),
 | |
|             }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "concat"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def concat(self, conditioning_to, conditioning_from):
 | |
|         out = []
 | |
| 
 | |
|         if len(conditioning_from) > 1:
 | |
|             print("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
 | |
| 
 | |
|         cond_from = conditioning_from[0][0]
 | |
| 
 | |
|         for i in range(len(conditioning_to)):
 | |
|             t1 = conditioning_to[i][0]
 | |
|             tw = torch.cat((t1, cond_from),1)
 | |
|             n = [tw, conditioning_to[i][1].copy()]
 | |
|             out.append(n)
 | |
| 
 | |
|         return (out, )
 | |
| 
 | |
| class ConditioningSetArea:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                               "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "append"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def append(self, conditioning, width, height, x, y, strength):
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             n = [t[0], t[1].copy()]
 | |
|             n[1]['area'] = (height // 8, width // 8, y // 8, x // 8)
 | |
|             n[1]['strength'] = strength
 | |
|             n[1]['set_area_to_bounds'] = False
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class ConditioningSetAreaPercentage:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                               "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
 | |
|                               "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
 | |
|                               "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
 | |
|                               "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
 | |
|                               "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "append"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def append(self, conditioning, width, height, x, y, strength):
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             n = [t[0], t[1].copy()]
 | |
|             n[1]['area'] = ("percentage", height, width, y, x)
 | |
|             n[1]['strength'] = strength
 | |
|             n[1]['set_area_to_bounds'] = False
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class ConditioningSetMask:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                               "mask": ("MASK", ),
 | |
|                               "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
 | |
|                               "set_cond_area": (["default", "mask bounds"],),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "append"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def append(self, conditioning, mask, set_cond_area, strength):
 | |
|         c = []
 | |
|         set_area_to_bounds = False
 | |
|         if set_cond_area != "default":
 | |
|             set_area_to_bounds = True
 | |
|         if len(mask.shape) < 3:
 | |
|             mask = mask.unsqueeze(0)
 | |
|         for t in conditioning:
 | |
|             n = [t[0], t[1].copy()]
 | |
|             _, h, w = mask.shape
 | |
|             n[1]['mask'] = mask
 | |
|             n[1]['set_area_to_bounds'] = set_area_to_bounds
 | |
|             n[1]['mask_strength'] = strength
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class ConditioningZeroOut:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", )}}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "zero_out"
 | |
| 
 | |
|     CATEGORY = "advanced/conditioning"
 | |
| 
 | |
|     def zero_out(self, conditioning):
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             d = t[1].copy()
 | |
|             if "pooled_output" in d:
 | |
|                 d["pooled_output"] = torch.zeros_like(d["pooled_output"])
 | |
|             n = [torch.zeros_like(t[0]), d]
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class ConditioningSetTimestepRange:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                              "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
 | |
|                              "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "set_range"
 | |
| 
 | |
|     CATEGORY = "advanced/conditioning"
 | |
| 
 | |
|     def set_range(self, conditioning, start, end):
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             d = t[1].copy()
 | |
|             d['start_percent'] = 1.0 - start
 | |
|             d['end_percent'] = 1.0 - end
 | |
|             n = [t[0], d]
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class VAEDecode:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}}
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "decode"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     def decode(self, vae, samples):
 | |
|         return (vae.decode(samples["samples"]), )
 | |
| 
 | |
| class VAEDecodeTiled:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ),
 | |
|                              "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
 | |
|                             }}
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "decode"
 | |
| 
 | |
|     CATEGORY = "_for_testing"
 | |
| 
 | |
|     def decode(self, vae, samples, tile_size):
 | |
|         return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), )
 | |
| 
 | |
| class VAEEncode:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "encode"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     @staticmethod
 | |
|     def vae_encode_crop_pixels(pixels):
 | |
|         x = (pixels.shape[1] // 8) * 8
 | |
|         y = (pixels.shape[2] // 8) * 8
 | |
|         if pixels.shape[1] != x or pixels.shape[2] != y:
 | |
|             x_offset = (pixels.shape[1] % 8) // 2
 | |
|             y_offset = (pixels.shape[2] % 8) // 2
 | |
|             pixels = pixels[:, x_offset:x + x_offset, y_offset:y + y_offset, :]
 | |
|         return pixels
 | |
| 
 | |
|     def encode(self, vae, pixels):
 | |
|         pixels = self.vae_encode_crop_pixels(pixels)
 | |
|         t = vae.encode(pixels[:,:,:,:3])
 | |
|         return ({"samples":t}, )
 | |
| 
 | |
| class VAEEncodeTiled:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ),
 | |
|                              "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
 | |
|                             }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "encode"
 | |
| 
 | |
|     CATEGORY = "_for_testing"
 | |
| 
 | |
|     def encode(self, vae, pixels, tile_size):
 | |
|         pixels = VAEEncode.vae_encode_crop_pixels(pixels)
 | |
|         t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, )
 | |
|         return ({"samples":t}, )
 | |
| 
 | |
| class VAEEncodeForInpaint:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "encode"
 | |
| 
 | |
|     CATEGORY = "latent/inpaint"
 | |
| 
 | |
|     def encode(self, vae, pixels, mask, grow_mask_by=6):
 | |
|         x = (pixels.shape[1] // 8) * 8
 | |
|         y = (pixels.shape[2] // 8) * 8
 | |
|         mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
 | |
| 
 | |
|         pixels = pixels.clone()
 | |
|         if pixels.shape[1] != x or pixels.shape[2] != y:
 | |
|             x_offset = (pixels.shape[1] % 8) // 2
 | |
|             y_offset = (pixels.shape[2] % 8) // 2
 | |
|             pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
 | |
|             mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
 | |
| 
 | |
|         #grow mask by a few pixels to keep things seamless in latent space
 | |
|         if grow_mask_by == 0:
 | |
|             mask_erosion = mask
 | |
|         else:
 | |
|             kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
 | |
|             padding = math.ceil((grow_mask_by - 1) / 2)
 | |
| 
 | |
|             mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
 | |
| 
 | |
|         m = (1.0 - mask.round()).squeeze(1)
 | |
|         for i in range(3):
 | |
|             pixels[:,:,:,i] -= 0.5
 | |
|             pixels[:,:,:,i] *= m
 | |
|             pixels[:,:,:,i] += 0.5
 | |
|         t = vae.encode(pixels)
 | |
| 
 | |
|         return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, )
 | |
| 
 | |
| class SaveLatent:
 | |
|     def __init__(self):
 | |
|         self.output_dir = folder_paths.get_output_directory()
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT", ),
 | |
|                               "filename_prefix": ("STRING", {"default": "latents/fcbh_backend"})},
 | |
|                 "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
 | |
|                 }
 | |
|     RETURN_TYPES = ()
 | |
|     FUNCTION = "save"
 | |
| 
 | |
|     OUTPUT_NODE = True
 | |
| 
 | |
|     CATEGORY = "_for_testing"
 | |
| 
 | |
|     def save(self, samples, filename_prefix="fcbh_backend", prompt=None, extra_pnginfo=None):
 | |
|         full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
 | |
| 
 | |
|         # support save metadata for latent sharing
 | |
|         prompt_info = ""
 | |
|         if prompt is not None:
 | |
|             prompt_info = json.dumps(prompt)
 | |
| 
 | |
|         metadata = None
 | |
|         if not args.disable_metadata:
 | |
|             metadata = {"prompt": prompt_info}
 | |
|             if extra_pnginfo is not None:
 | |
|                 for x in extra_pnginfo:
 | |
|                     metadata[x] = json.dumps(extra_pnginfo[x])
 | |
| 
 | |
|         file = f"{filename}_{counter:05}_.latent"
 | |
| 
 | |
|         results = list()
 | |
|         results.append({
 | |
|             "filename": file,
 | |
|             "subfolder": subfolder,
 | |
|             "type": "output"
 | |
|         })
 | |
| 
 | |
|         file = os.path.join(full_output_folder, file)
 | |
| 
 | |
|         output = {}
 | |
|         output["latent_tensor"] = samples["samples"]
 | |
|         output["latent_format_version_0"] = torch.tensor([])
 | |
| 
 | |
|         fcbh.utils.save_torch_file(output, file, metadata=metadata)
 | |
|         return { "ui": { "latents": results } }
 | |
| 
 | |
| 
 | |
| class LoadLatent:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         input_dir = folder_paths.get_input_directory()
 | |
|         files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
 | |
|         return {"required": {"latent": [sorted(files), ]}, }
 | |
| 
 | |
|     CATEGORY = "_for_testing"
 | |
| 
 | |
|     RETURN_TYPES = ("LATENT", )
 | |
|     FUNCTION = "load"
 | |
| 
 | |
|     def load(self, latent):
 | |
|         latent_path = folder_paths.get_annotated_filepath(latent)
 | |
|         latent = safetensors.torch.load_file(latent_path, device="cpu")
 | |
|         multiplier = 1.0
 | |
|         if "latent_format_version_0" not in latent:
 | |
|             multiplier = 1.0 / 0.18215
 | |
|         samples = {"samples": latent["latent_tensor"].float() * multiplier}
 | |
|         return (samples, )
 | |
| 
 | |
|     @classmethod
 | |
|     def IS_CHANGED(s, latent):
 | |
|         image_path = folder_paths.get_annotated_filepath(latent)
 | |
|         m = hashlib.sha256()
 | |
|         with open(image_path, 'rb') as f:
 | |
|             m.update(f.read())
 | |
|         return m.digest().hex()
 | |
| 
 | |
|     @classmethod
 | |
|     def VALIDATE_INPUTS(s, latent):
 | |
|         if not folder_paths.exists_annotated_filepath(latent):
 | |
|             return "Invalid latent file: {}".format(latent)
 | |
|         return True
 | |
| 
 | |
| 
 | |
| class CheckpointLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ),
 | |
|                               "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}}
 | |
|     RETURN_TYPES = ("MODEL", "CLIP", "VAE")
 | |
|     FUNCTION = "load_checkpoint"
 | |
| 
 | |
|     CATEGORY = "advanced/loaders"
 | |
| 
 | |
|     def load_checkpoint(self, config_name, ckpt_name, output_vae=True, output_clip=True):
 | |
|         config_path = folder_paths.get_full_path("configs", config_name)
 | |
|         ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
 | |
|         return fcbh.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
| 
 | |
| class CheckpointLoaderSimple:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("MODEL", "CLIP", "VAE")
 | |
|     FUNCTION = "load_checkpoint"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
 | |
|         ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
 | |
|         out = fcbh.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
|         return out[:3]
 | |
| 
 | |
| class DiffusersLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(cls):
 | |
|         paths = []
 | |
|         for search_path in folder_paths.get_folder_paths("diffusers"):
 | |
|             if os.path.exists(search_path):
 | |
|                 for root, subdir, files in os.walk(search_path, followlinks=True):
 | |
|                     if "model_index.json" in files:
 | |
|                         paths.append(os.path.relpath(root, start=search_path))
 | |
| 
 | |
|         return {"required": {"model_path": (paths,), }}
 | |
|     RETURN_TYPES = ("MODEL", "CLIP", "VAE")
 | |
|     FUNCTION = "load_checkpoint"
 | |
| 
 | |
|     CATEGORY = "advanced/loaders/deprecated"
 | |
| 
 | |
|     def load_checkpoint(self, model_path, output_vae=True, output_clip=True):
 | |
|         for search_path in folder_paths.get_folder_paths("diffusers"):
 | |
|             if os.path.exists(search_path):
 | |
|                 path = os.path.join(search_path, model_path)
 | |
|                 if os.path.exists(path):
 | |
|                     model_path = path
 | |
|                     break
 | |
| 
 | |
|         return fcbh.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
| 
 | |
| 
 | |
| class unCLIPCheckpointLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
 | |
|     FUNCTION = "load_checkpoint"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
 | |
|         ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
 | |
|         out = fcbh.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
|         return out
 | |
| 
 | |
| class CLIPSetLastLayer:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "clip": ("CLIP", ),
 | |
|                               "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("CLIP",)
 | |
|     FUNCTION = "set_last_layer"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def set_last_layer(self, clip, stop_at_clip_layer):
 | |
|         clip = clip.clone()
 | |
|         clip.clip_layer(stop_at_clip_layer)
 | |
|         return (clip,)
 | |
| 
 | |
| class LoraLoader:
 | |
|     def __init__(self):
 | |
|         self.loaded_lora = None
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "model": ("MODEL",),
 | |
|                               "clip": ("CLIP", ),
 | |
|                               "lora_name": (folder_paths.get_filename_list("loras"), ),
 | |
|                               "strength_model": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
 | |
|                               "strength_clip": ("FLOAT", {"default": 1.0, "min": -20.0, "max": 20.0, "step": 0.01}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("MODEL", "CLIP")
 | |
|     FUNCTION = "load_lora"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
 | |
|         if strength_model == 0 and strength_clip == 0:
 | |
|             return (model, clip)
 | |
| 
 | |
|         lora_path = folder_paths.get_full_path("loras", lora_name)
 | |
|         lora = None
 | |
|         if self.loaded_lora is not None:
 | |
|             if self.loaded_lora[0] == lora_path:
 | |
|                 lora = self.loaded_lora[1]
 | |
|             else:
 | |
|                 temp = self.loaded_lora
 | |
|                 self.loaded_lora = None
 | |
|                 del temp
 | |
| 
 | |
|         if lora is None:
 | |
|             lora = fcbh.utils.load_torch_file(lora_path, safe_load=True)
 | |
|             self.loaded_lora = (lora_path, lora)
 | |
| 
 | |
|         model_lora, clip_lora = fcbh.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
 | |
|         return (model_lora, clip_lora)
 | |
| 
 | |
| class VAELoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "vae_name": (folder_paths.get_filename_list("vae"), )}}
 | |
|     RETURN_TYPES = ("VAE",)
 | |
|     FUNCTION = "load_vae"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     #TODO: scale factor?
 | |
|     def load_vae(self, vae_name):
 | |
|         vae_path = folder_paths.get_full_path("vae", vae_name)
 | |
|         sd = fcbh.utils.load_torch_file(vae_path)
 | |
|         vae = fcbh.sd.VAE(sd=sd)
 | |
|         return (vae,)
 | |
| 
 | |
| class ControlNetLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
 | |
| 
 | |
|     RETURN_TYPES = ("CONTROL_NET",)
 | |
|     FUNCTION = "load_controlnet"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_controlnet(self, control_net_name):
 | |
|         controlnet_path = folder_paths.get_full_path("controlnet", control_net_name)
 | |
|         controlnet = fcbh.controlnet.load_controlnet(controlnet_path)
 | |
|         return (controlnet,)
 | |
| 
 | |
| class DiffControlNetLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "model": ("MODEL",),
 | |
|                               "control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
 | |
| 
 | |
|     RETURN_TYPES = ("CONTROL_NET",)
 | |
|     FUNCTION = "load_controlnet"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_controlnet(self, model, control_net_name):
 | |
|         controlnet_path = folder_paths.get_full_path("controlnet", control_net_name)
 | |
|         controlnet = fcbh.controlnet.load_controlnet(controlnet_path, model)
 | |
|         return (controlnet,)
 | |
| 
 | |
| 
 | |
| class ControlNetApply:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                              "control_net": ("CONTROL_NET", ),
 | |
|                              "image": ("IMAGE", ),
 | |
|                              "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "apply_controlnet"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def apply_controlnet(self, conditioning, control_net, image, strength):
 | |
|         if strength == 0:
 | |
|             return (conditioning, )
 | |
| 
 | |
|         c = []
 | |
|         control_hint = image.movedim(-1,1)
 | |
|         for t in conditioning:
 | |
|             n = [t[0], t[1].copy()]
 | |
|             c_net = control_net.copy().set_cond_hint(control_hint, strength)
 | |
|             if 'control' in t[1]:
 | |
|                 c_net.set_previous_controlnet(t[1]['control'])
 | |
|             n[1]['control'] = c_net
 | |
|             n[1]['control_apply_to_uncond'] = True
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| 
 | |
| class ControlNetApplyAdvanced:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"positive": ("CONDITIONING", ),
 | |
|                              "negative": ("CONDITIONING", ),
 | |
|                              "control_net": ("CONTROL_NET", ),
 | |
|                              "image": ("IMAGE", ),
 | |
|                              "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
 | |
|                              "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
 | |
|                              "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
 | |
|                              }}
 | |
| 
 | |
|     RETURN_TYPES = ("CONDITIONING","CONDITIONING")
 | |
|     RETURN_NAMES = ("positive", "negative")
 | |
|     FUNCTION = "apply_controlnet"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent):
 | |
|         if strength == 0:
 | |
|             return (positive, negative)
 | |
| 
 | |
|         control_hint = image.movedim(-1,1)
 | |
|         cnets = {}
 | |
| 
 | |
|         out = []
 | |
|         for conditioning in [positive, negative]:
 | |
|             c = []
 | |
|             for t in conditioning:
 | |
|                 d = t[1].copy()
 | |
| 
 | |
|                 prev_cnet = d.get('control', None)
 | |
|                 if prev_cnet in cnets:
 | |
|                     c_net = cnets[prev_cnet]
 | |
|                 else:
 | |
|                     c_net = control_net.copy().set_cond_hint(control_hint, strength, (1.0 - start_percent, 1.0 - end_percent))
 | |
|                     c_net.set_previous_controlnet(prev_cnet)
 | |
|                     cnets[prev_cnet] = c_net
 | |
| 
 | |
|                 d['control'] = c_net
 | |
|                 d['control_apply_to_uncond'] = False
 | |
|                 n = [t[0], d]
 | |
|                 c.append(n)
 | |
|             out.append(c)
 | |
|         return (out[0], out[1])
 | |
| 
 | |
| 
 | |
| class UNETLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "unet_name": (folder_paths.get_filename_list("unet"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("MODEL",)
 | |
|     FUNCTION = "load_unet"
 | |
| 
 | |
|     CATEGORY = "advanced/loaders"
 | |
| 
 | |
|     def load_unet(self, unet_name):
 | |
|         unet_path = folder_paths.get_full_path("unet", unet_name)
 | |
|         model = fcbh.sd.load_unet(unet_path)
 | |
|         return (model,)
 | |
| 
 | |
| class CLIPLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "clip_name": (folder_paths.get_filename_list("clip"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CLIP",)
 | |
|     FUNCTION = "load_clip"
 | |
| 
 | |
|     CATEGORY = "advanced/loaders"
 | |
| 
 | |
|     def load_clip(self, clip_name):
 | |
|         clip_path = folder_paths.get_full_path("clip", clip_name)
 | |
|         clip = fcbh.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
|         return (clip,)
 | |
| 
 | |
| class DualCLIPLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "clip_name1": (folder_paths.get_filename_list("clip"), ), "clip_name2": (folder_paths.get_filename_list("clip"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CLIP",)
 | |
|     FUNCTION = "load_clip"
 | |
| 
 | |
|     CATEGORY = "advanced/loaders"
 | |
| 
 | |
|     def load_clip(self, clip_name1, clip_name2):
 | |
|         clip_path1 = folder_paths.get_full_path("clip", clip_name1)
 | |
|         clip_path2 = folder_paths.get_full_path("clip", clip_name2)
 | |
|         clip = fcbh.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"))
 | |
|         return (clip,)
 | |
| 
 | |
| class CLIPVisionLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CLIP_VISION",)
 | |
|     FUNCTION = "load_clip"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_clip(self, clip_name):
 | |
|         clip_path = folder_paths.get_full_path("clip_vision", clip_name)
 | |
|         clip_vision = fcbh.clip_vision.load(clip_path)
 | |
|         return (clip_vision,)
 | |
| 
 | |
| class CLIPVisionEncode:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "clip_vision": ("CLIP_VISION",),
 | |
|                               "image": ("IMAGE",)
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
 | |
|     FUNCTION = "encode"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def encode(self, clip_vision, image):
 | |
|         output = clip_vision.encode_image(image)
 | |
|         return (output,)
 | |
| 
 | |
| class StyleModelLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}}
 | |
| 
 | |
|     RETURN_TYPES = ("STYLE_MODEL",)
 | |
|     FUNCTION = "load_style_model"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_style_model(self, style_model_name):
 | |
|         style_model_path = folder_paths.get_full_path("style_models", style_model_name)
 | |
|         style_model = fcbh.sd.load_style_model(style_model_path)
 | |
|         return (style_model,)
 | |
| 
 | |
| 
 | |
| class StyleModelApply:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                              "style_model": ("STYLE_MODEL", ),
 | |
|                              "clip_vision_output": ("CLIP_VISION_OUTPUT", ),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "apply_stylemodel"
 | |
| 
 | |
|     CATEGORY = "conditioning/style_model"
 | |
| 
 | |
|     def apply_stylemodel(self, clip_vision_output, style_model, conditioning):
 | |
|         cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             n = [torch.cat((t[0], cond), dim=1), t[1].copy()]
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class unCLIPConditioning:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning": ("CONDITIONING", ),
 | |
|                              "clip_vision_output": ("CLIP_VISION_OUTPUT", ),
 | |
|                              "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
 | |
|                              "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "apply_adm"
 | |
| 
 | |
|     CATEGORY = "conditioning"
 | |
| 
 | |
|     def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
 | |
|         if strength == 0:
 | |
|             return (conditioning, )
 | |
| 
 | |
|         c = []
 | |
|         for t in conditioning:
 | |
|             o = t[1].copy()
 | |
|             x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}
 | |
|             if "unclip_conditioning" in o:
 | |
|                 o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x]
 | |
|             else:
 | |
|                 o["unclip_conditioning"] = [x]
 | |
|             n = [t[0], o]
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class GLIGENLoader:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}}
 | |
| 
 | |
|     RETURN_TYPES = ("GLIGEN",)
 | |
|     FUNCTION = "load_gligen"
 | |
| 
 | |
|     CATEGORY = "loaders"
 | |
| 
 | |
|     def load_gligen(self, gligen_name):
 | |
|         gligen_path = folder_paths.get_full_path("gligen", gligen_name)
 | |
|         gligen = fcbh.sd.load_gligen(gligen_path)
 | |
|         return (gligen,)
 | |
| 
 | |
| class GLIGENTextBoxApply:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {"conditioning_to": ("CONDITIONING", ),
 | |
|                               "clip": ("CLIP", ),
 | |
|                               "gligen_textbox_model": ("GLIGEN", ),
 | |
|                               "text": ("STRING", {"multiline": True}),
 | |
|                               "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                              }}
 | |
|     RETURN_TYPES = ("CONDITIONING",)
 | |
|     FUNCTION = "append"
 | |
| 
 | |
|     CATEGORY = "conditioning/gligen"
 | |
| 
 | |
|     def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
 | |
|         c = []
 | |
|         cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled=True)
 | |
|         for t in conditioning_to:
 | |
|             n = [t[0], t[1].copy()]
 | |
|             position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
 | |
|             prev = []
 | |
|             if "gligen" in n[1]:
 | |
|                 prev = n[1]['gligen'][2]
 | |
| 
 | |
|             n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
 | |
|             c.append(n)
 | |
|         return (c, )
 | |
| 
 | |
| class EmptyLatentImage:
 | |
|     def __init__(self, device="cpu"):
 | |
|         self.device = device
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096})}}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "generate"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     def generate(self, width, height, batch_size=1):
 | |
|         latent = torch.zeros([batch_size, 4, height // 8, width // 8])
 | |
|         return ({"samples":latent}, )
 | |
| 
 | |
| 
 | |
| class LatentFromBatch:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
 | |
|                               "length": ("INT", {"default": 1, "min": 1, "max": 64}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "frombatch"
 | |
| 
 | |
|     CATEGORY = "latent/batch"
 | |
| 
 | |
|     def frombatch(self, samples, batch_index, length):
 | |
|         s = samples.copy()
 | |
|         s_in = samples["samples"]
 | |
|         batch_index = min(s_in.shape[0] - 1, batch_index)
 | |
|         length = min(s_in.shape[0] - batch_index, length)
 | |
|         s["samples"] = s_in[batch_index:batch_index + length].clone()
 | |
|         if "noise_mask" in samples:
 | |
|             masks = samples["noise_mask"]
 | |
|             if masks.shape[0] == 1:
 | |
|                 s["noise_mask"] = masks.clone()
 | |
|             else:
 | |
|                 if masks.shape[0] < s_in.shape[0]:
 | |
|                     masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
 | |
|                 s["noise_mask"] = masks[batch_index:batch_index + length].clone()
 | |
|         if "batch_index" not in s:
 | |
|             s["batch_index"] = [x for x in range(batch_index, batch_index+length)]
 | |
|         else:
 | |
|             s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
 | |
|         return (s,)
 | |
|     
 | |
| class RepeatLatentBatch:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "amount": ("INT", {"default": 1, "min": 1, "max": 64}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "repeat"
 | |
| 
 | |
|     CATEGORY = "latent/batch"
 | |
| 
 | |
|     def repeat(self, samples, amount):
 | |
|         s = samples.copy()
 | |
|         s_in = samples["samples"]
 | |
|         
 | |
|         s["samples"] = s_in.repeat((amount, 1,1,1))
 | |
|         if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
 | |
|             masks = samples["noise_mask"]
 | |
|             if masks.shape[0] < s_in.shape[0]:
 | |
|                 masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
 | |
|             s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1))
 | |
|         if "batch_index" in s:
 | |
|             offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
 | |
|             s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
 | |
|         return (s,)
 | |
| 
 | |
| class LatentUpscale:
 | |
|     upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
 | |
|     crop_methods = ["disabled", "center"]
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
 | |
|                               "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "crop": (s.crop_methods,)}}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "upscale"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     def upscale(self, samples, upscale_method, width, height, crop):
 | |
|         if width == 0 and height == 0:
 | |
|             s = samples
 | |
|         else:
 | |
|             s = samples.copy()
 | |
| 
 | |
|             if width == 0:
 | |
|                 height = max(64, height)
 | |
|                 width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2]))
 | |
|             elif height == 0:
 | |
|                 width = max(64, width)
 | |
|                 height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3]))
 | |
|             else:
 | |
|                 width = max(64, width)
 | |
|                 height = max(64, height)
 | |
| 
 | |
|             s["samples"] = fcbh.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
 | |
|         return (s,)
 | |
| 
 | |
| class LatentUpscaleBy:
 | |
|     upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
 | |
|                               "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "upscale"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     def upscale(self, samples, upscale_method, scale_by):
 | |
|         s = samples.copy()
 | |
|         width = round(samples["samples"].shape[3] * scale_by)
 | |
|         height = round(samples["samples"].shape[2] * scale_by)
 | |
|         s["samples"] = fcbh.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
 | |
|         return (s,)
 | |
| 
 | |
| class LatentRotate:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "rotate"
 | |
| 
 | |
|     CATEGORY = "latent/transform"
 | |
| 
 | |
|     def rotate(self, samples, rotation):
 | |
|         s = samples.copy()
 | |
|         rotate_by = 0
 | |
|         if rotation.startswith("90"):
 | |
|             rotate_by = 1
 | |
|         elif rotation.startswith("180"):
 | |
|             rotate_by = 2
 | |
|         elif rotation.startswith("270"):
 | |
|             rotate_by = 3
 | |
| 
 | |
|         s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
 | |
|         return (s,)
 | |
| 
 | |
| class LatentFlip:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "flip"
 | |
| 
 | |
|     CATEGORY = "latent/transform"
 | |
| 
 | |
|     def flip(self, samples, flip_method):
 | |
|         s = samples.copy()
 | |
|         if flip_method.startswith("x"):
 | |
|             s["samples"] = torch.flip(samples["samples"], dims=[2])
 | |
|         elif flip_method.startswith("y"):
 | |
|             s["samples"] = torch.flip(samples["samples"], dims=[3])
 | |
| 
 | |
|         return (s,)
 | |
| 
 | |
| class LatentComposite:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples_to": ("LATENT",),
 | |
|                               "samples_from": ("LATENT",),
 | |
|                               "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "composite"
 | |
| 
 | |
|     CATEGORY = "latent"
 | |
| 
 | |
|     def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
 | |
|         x =  x // 8
 | |
|         y = y // 8
 | |
|         feather = feather // 8
 | |
|         samples_out = samples_to.copy()
 | |
|         s = samples_to["samples"].clone()
 | |
|         samples_to = samples_to["samples"]
 | |
|         samples_from = samples_from["samples"]
 | |
|         if feather == 0:
 | |
|             s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
 | |
|         else:
 | |
|             samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
 | |
|             mask = torch.ones_like(samples_from)
 | |
|             for t in range(feather):
 | |
|                 if y != 0:
 | |
|                     mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1))
 | |
| 
 | |
|                 if y + samples_from.shape[2] < samples_to.shape[2]:
 | |
|                     mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1))
 | |
|                 if x != 0:
 | |
|                     mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1))
 | |
|                 if x + samples_from.shape[3] < samples_to.shape[3]:
 | |
|                     mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1))
 | |
|             rev_mask = torch.ones_like(mask) - mask
 | |
|             s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask
 | |
|         samples_out["samples"] = s
 | |
|         return (samples_out,)
 | |
| 
 | |
| class LatentBlend:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": {
 | |
|             "samples1": ("LATENT",),
 | |
|             "samples2": ("LATENT",),
 | |
|             "blend_factor": ("FLOAT", {
 | |
|                 "default": 0.5,
 | |
|                 "min": 0,
 | |
|                 "max": 1,
 | |
|                 "step": 0.01
 | |
|             }),
 | |
|         }}
 | |
| 
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "blend"
 | |
| 
 | |
|     CATEGORY = "_for_testing"
 | |
| 
 | |
|     def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"):
 | |
| 
 | |
|         samples_out = samples1.copy()
 | |
|         samples1 = samples1["samples"]
 | |
|         samples2 = samples2["samples"]
 | |
| 
 | |
|         if samples1.shape != samples2.shape:
 | |
|             samples2.permute(0, 3, 1, 2)
 | |
|             samples2 = fcbh.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
 | |
|             samples2.permute(0, 2, 3, 1)
 | |
| 
 | |
|         samples_blended = self.blend_mode(samples1, samples2, blend_mode)
 | |
|         samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
 | |
|         samples_out["samples"] = samples_blended
 | |
|         return (samples_out,)
 | |
| 
 | |
|     def blend_mode(self, img1, img2, mode):
 | |
|         if mode == "normal":
 | |
|             return img2
 | |
|         else:
 | |
|             raise ValueError(f"Unsupported blend mode: {mode}")
 | |
| 
 | |
| class LatentCrop:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "crop"
 | |
| 
 | |
|     CATEGORY = "latent/transform"
 | |
| 
 | |
|     def crop(self, samples, width, height, x, y):
 | |
|         s = samples.copy()
 | |
|         samples = samples['samples']
 | |
|         x =  x // 8
 | |
|         y = y // 8
 | |
| 
 | |
|         #enfonce minimum size of 64
 | |
|         if x > (samples.shape[3] - 8):
 | |
|             x = samples.shape[3] - 8
 | |
|         if y > (samples.shape[2] - 8):
 | |
|             y = samples.shape[2] - 8
 | |
| 
 | |
|         new_height = height // 8
 | |
|         new_width = width // 8
 | |
|         to_x = new_width + x
 | |
|         to_y = new_height + y
 | |
|         s['samples'] = samples[:,:,y:to_y, x:to_x]
 | |
|         return (s,)
 | |
| 
 | |
| class SetLatentNoiseMask:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "samples": ("LATENT",),
 | |
|                               "mask": ("MASK",),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "set_mask"
 | |
| 
 | |
|     CATEGORY = "latent/inpaint"
 | |
| 
 | |
|     def set_mask(self, samples, mask):
 | |
|         s = samples.copy()
 | |
|         s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
 | |
|         return (s,)
 | |
| 
 | |
| def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
 | |
|     latent_image = latent["samples"]
 | |
|     if disable_noise:
 | |
|         noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
 | |
|     else:
 | |
|         batch_inds = latent["batch_index"] if "batch_index" in latent else None
 | |
|         noise = fcbh.sample.prepare_noise(latent_image, seed, batch_inds)
 | |
| 
 | |
|     noise_mask = None
 | |
|     if "noise_mask" in latent:
 | |
|         noise_mask = latent["noise_mask"]
 | |
| 
 | |
|     callback = latent_preview.prepare_callback(model, steps)
 | |
|     disable_pbar = not fcbh.utils.PROGRESS_BAR_ENABLED
 | |
|     samples = fcbh.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
 | |
|                                   denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
 | |
|                                   force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
 | |
|     out = latent.copy()
 | |
|     out["samples"] = samples
 | |
|     return (out, )
 | |
| 
 | |
| class KSampler:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required":
 | |
|                     {"model": ("MODEL",),
 | |
|                     "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
 | |
|                     "steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
 | |
|                     "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.5, "round": 0.01}),
 | |
|                     "sampler_name": (fcbh.samplers.KSampler.SAMPLERS, ),
 | |
|                     "scheduler": (fcbh.samplers.KSampler.SCHEDULERS, ),
 | |
|                     "positive": ("CONDITIONING", ),
 | |
|                     "negative": ("CONDITIONING", ),
 | |
|                     "latent_image": ("LATENT", ),
 | |
|                     "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
 | |
|                      }
 | |
|                 }
 | |
| 
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "sample"
 | |
| 
 | |
|     CATEGORY = "sampling"
 | |
| 
 | |
|     def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
 | |
|         return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
 | |
| 
 | |
| class KSamplerAdvanced:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required":
 | |
|                     {"model": ("MODEL",),
 | |
|                     "add_noise": (["enable", "disable"], ),
 | |
|                     "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
 | |
|                     "steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
 | |
|                     "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.5, "round": 0.01}),
 | |
|                     "sampler_name": (fcbh.samplers.KSampler.SAMPLERS, ),
 | |
|                     "scheduler": (fcbh.samplers.KSampler.SCHEDULERS, ),
 | |
|                     "positive": ("CONDITIONING", ),
 | |
|                     "negative": ("CONDITIONING", ),
 | |
|                     "latent_image": ("LATENT", ),
 | |
|                     "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
 | |
|                     "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
 | |
|                     "return_with_leftover_noise": (["disable", "enable"], ),
 | |
|                      }
 | |
|                 }
 | |
| 
 | |
|     RETURN_TYPES = ("LATENT",)
 | |
|     FUNCTION = "sample"
 | |
| 
 | |
|     CATEGORY = "sampling"
 | |
| 
 | |
|     def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
 | |
|         force_full_denoise = True
 | |
|         if return_with_leftover_noise == "enable":
 | |
|             force_full_denoise = False
 | |
|         disable_noise = False
 | |
|         if add_noise == "disable":
 | |
|             disable_noise = True
 | |
|         return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
 | |
| 
 | |
| class SaveImage:
 | |
|     def __init__(self):
 | |
|         self.output_dir = folder_paths.get_output_directory()
 | |
|         self.type = "output"
 | |
|         self.prefix_append = ""
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": 
 | |
|                     {"images": ("IMAGE", ),
 | |
|                      "filename_prefix": ("STRING", {"default": "fcbh_backend"})},
 | |
|                 "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
 | |
|                 }
 | |
| 
 | |
|     RETURN_TYPES = ()
 | |
|     FUNCTION = "save_images"
 | |
| 
 | |
|     OUTPUT_NODE = True
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     def save_images(self, images, filename_prefix="fcbh_backend", prompt=None, extra_pnginfo=None):
 | |
|         filename_prefix += self.prefix_append
 | |
|         full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
 | |
|         results = list()
 | |
|         for image in images:
 | |
|             i = 255. * image.cpu().numpy()
 | |
|             img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
 | |
|             metadata = None
 | |
|             if not args.disable_metadata:
 | |
|                 metadata = PngInfo()
 | |
|                 if prompt is not None:
 | |
|                     metadata.add_text("prompt", json.dumps(prompt))
 | |
|                 if extra_pnginfo is not None:
 | |
|                     for x in extra_pnginfo:
 | |
|                         metadata.add_text(x, json.dumps(extra_pnginfo[x]))
 | |
| 
 | |
|             file = f"{filename}_{counter:05}_.png"
 | |
|             img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=4)
 | |
|             results.append({
 | |
|                 "filename": file,
 | |
|                 "subfolder": subfolder,
 | |
|                 "type": self.type
 | |
|             })
 | |
|             counter += 1
 | |
| 
 | |
|         return { "ui": { "images": results } }
 | |
| 
 | |
| class PreviewImage(SaveImage):
 | |
|     def __init__(self):
 | |
|         self.output_dir = folder_paths.get_temp_directory()
 | |
|         self.type = "temp"
 | |
|         self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required":
 | |
|                     {"images": ("IMAGE", ), },
 | |
|                 "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
 | |
|                 }
 | |
| 
 | |
| class LoadImage:
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         input_dir = folder_paths.get_input_directory()
 | |
|         files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
 | |
|         return {"required":
 | |
|                     {"image": (sorted(files), {"image_upload": True})},
 | |
|                 }
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     RETURN_TYPES = ("IMAGE", "MASK")
 | |
|     FUNCTION = "load_image"
 | |
|     def load_image(self, image):
 | |
|         image_path = folder_paths.get_annotated_filepath(image)
 | |
|         i = Image.open(image_path)
 | |
|         i = ImageOps.exif_transpose(i)
 | |
|         image = i.convert("RGB")
 | |
|         image = np.array(image).astype(np.float32) / 255.0
 | |
|         image = torch.from_numpy(image)[None,]
 | |
|         if 'A' in i.getbands():
 | |
|             mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
 | |
|             mask = 1. - torch.from_numpy(mask)
 | |
|         else:
 | |
|             mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
 | |
|         return (image, mask.unsqueeze(0))
 | |
| 
 | |
|     @classmethod
 | |
|     def IS_CHANGED(s, image):
 | |
|         image_path = folder_paths.get_annotated_filepath(image)
 | |
|         m = hashlib.sha256()
 | |
|         with open(image_path, 'rb') as f:
 | |
|             m.update(f.read())
 | |
|         return m.digest().hex()
 | |
| 
 | |
|     @classmethod
 | |
|     def VALIDATE_INPUTS(s, image):
 | |
|         if not folder_paths.exists_annotated_filepath(image):
 | |
|             return "Invalid image file: {}".format(image)
 | |
| 
 | |
|         return True
 | |
| 
 | |
| class LoadImageMask:
 | |
|     _color_channels = ["alpha", "red", "green", "blue"]
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         input_dir = folder_paths.get_input_directory()
 | |
|         files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
 | |
|         return {"required":
 | |
|                     {"image": (sorted(files), {"image_upload": True}),
 | |
|                      "channel": (s._color_channels, ), }
 | |
|                 }
 | |
| 
 | |
|     CATEGORY = "mask"
 | |
| 
 | |
|     RETURN_TYPES = ("MASK",)
 | |
|     FUNCTION = "load_image"
 | |
|     def load_image(self, image, channel):
 | |
|         image_path = folder_paths.get_annotated_filepath(image)
 | |
|         i = Image.open(image_path)
 | |
|         i = ImageOps.exif_transpose(i)
 | |
|         if i.getbands() != ("R", "G", "B", "A"):
 | |
|             i = i.convert("RGBA")
 | |
|         mask = None
 | |
|         c = channel[0].upper()
 | |
|         if c in i.getbands():
 | |
|             mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
 | |
|             mask = torch.from_numpy(mask)
 | |
|             if c == 'A':
 | |
|                 mask = 1. - mask
 | |
|         else:
 | |
|             mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
 | |
|         return (mask.unsqueeze(0),)
 | |
| 
 | |
|     @classmethod
 | |
|     def IS_CHANGED(s, image, channel):
 | |
|         image_path = folder_paths.get_annotated_filepath(image)
 | |
|         m = hashlib.sha256()
 | |
|         with open(image_path, 'rb') as f:
 | |
|             m.update(f.read())
 | |
|         return m.digest().hex()
 | |
| 
 | |
|     @classmethod
 | |
|     def VALIDATE_INPUTS(s, image, channel):
 | |
|         if not folder_paths.exists_annotated_filepath(image):
 | |
|             return "Invalid image file: {}".format(image)
 | |
| 
 | |
|         if channel not in s._color_channels:
 | |
|             return "Invalid color channel: {}".format(channel)
 | |
| 
 | |
|         return True
 | |
| 
 | |
| class ImageScale:
 | |
|     upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
 | |
|     crop_methods = ["disabled", "center"]
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
 | |
|                               "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
 | |
|                               "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
 | |
|                               "crop": (s.crop_methods,)}}
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "upscale"
 | |
| 
 | |
|     CATEGORY = "image/upscaling"
 | |
| 
 | |
|     def upscale(self, image, upscale_method, width, height, crop):
 | |
|         if width == 0 and height == 0:
 | |
|             s = image
 | |
|         else:
 | |
|             samples = image.movedim(-1,1)
 | |
| 
 | |
|             if width == 0:
 | |
|                 width = max(1, round(samples.shape[3] * height / samples.shape[2]))
 | |
|             elif height == 0:
 | |
|                 height = max(1, round(samples.shape[2] * width / samples.shape[3]))
 | |
| 
 | |
|             s = fcbh.utils.common_upscale(samples, width, height, upscale_method, crop)
 | |
|             s = s.movedim(1,-1)
 | |
|         return (s,)
 | |
| 
 | |
| class ImageScaleBy:
 | |
|     upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
 | |
|                               "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}}
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "upscale"
 | |
| 
 | |
|     CATEGORY = "image/upscaling"
 | |
| 
 | |
|     def upscale(self, image, upscale_method, scale_by):
 | |
|         samples = image.movedim(-1,1)
 | |
|         width = round(samples.shape[3] * scale_by)
 | |
|         height = round(samples.shape[2] * scale_by)
 | |
|         s = fcbh.utils.common_upscale(samples, width, height, upscale_method, "disabled")
 | |
|         s = s.movedim(1,-1)
 | |
|         return (s,)
 | |
| 
 | |
| class ImageInvert:
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "image": ("IMAGE",)}}
 | |
| 
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "invert"
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     def invert(self, image):
 | |
|         s = 1.0 - image
 | |
|         return (s,)
 | |
| 
 | |
| class ImageBatch:
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}}
 | |
| 
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "batch"
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     def batch(self, image1, image2):
 | |
|         if image1.shape[1:] != image2.shape[1:]:
 | |
|             image2 = fcbh.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1)
 | |
|         s = torch.cat((image1, image2), dim=0)
 | |
|         return (s,)
 | |
| 
 | |
| class EmptyImage:
 | |
|     def __init__(self, device="cpu"):
 | |
|         self.device = device
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
 | |
|                               "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
 | |
|                               "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
 | |
|                               "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
 | |
|                               }}
 | |
|     RETURN_TYPES = ("IMAGE",)
 | |
|     FUNCTION = "generate"
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     def generate(self, width, height, batch_size=1, color=0):
 | |
|         r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
 | |
|         g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
 | |
|         b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
 | |
|         return (torch.cat((r, g, b), dim=-1), )
 | |
| 
 | |
| class ImagePadForOutpaint:
 | |
| 
 | |
|     @classmethod
 | |
|     def INPUT_TYPES(s):
 | |
|         return {
 | |
|             "required": {
 | |
|                 "image": ("IMAGE",),
 | |
|                 "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                 "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                 "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                 "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
 | |
|                 "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     RETURN_TYPES = ("IMAGE", "MASK")
 | |
|     FUNCTION = "expand_image"
 | |
| 
 | |
|     CATEGORY = "image"
 | |
| 
 | |
|     def expand_image(self, image, left, top, right, bottom, feathering):
 | |
|         d1, d2, d3, d4 = image.size()
 | |
| 
 | |
|         new_image = torch.zeros(
 | |
|             (d1, d2 + top + bottom, d3 + left + right, d4),
 | |
|             dtype=torch.float32,
 | |
|         )
 | |
|         new_image[:, top:top + d2, left:left + d3, :] = image
 | |
| 
 | |
|         mask = torch.ones(
 | |
|             (d2 + top + bottom, d3 + left + right),
 | |
|             dtype=torch.float32,
 | |
|         )
 | |
| 
 | |
|         t = torch.zeros(
 | |
|             (d2, d3),
 | |
|             dtype=torch.float32
 | |
|         )
 | |
| 
 | |
|         if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3:
 | |
| 
 | |
|             for i in range(d2):
 | |
|                 for j in range(d3):
 | |
|                     dt = i if top != 0 else d2
 | |
|                     db = d2 - i if bottom != 0 else d2
 | |
| 
 | |
|                     dl = j if left != 0 else d3
 | |
|                     dr = d3 - j if right != 0 else d3
 | |
| 
 | |
|                     d = min(dt, db, dl, dr)
 | |
| 
 | |
|                     if d >= feathering:
 | |
|                         continue
 | |
| 
 | |
|                     v = (feathering - d) / feathering
 | |
| 
 | |
|                     t[i, j] = v * v
 | |
| 
 | |
|         mask[top:top + d2, left:left + d3] = t
 | |
| 
 | |
|         return (new_image, mask)
 | |
| 
 | |
| 
 | |
| NODE_CLASS_MAPPINGS = {
 | |
|     "KSampler": KSampler,
 | |
|     "CheckpointLoaderSimple": CheckpointLoaderSimple,
 | |
|     "CLIPTextEncode": CLIPTextEncode,
 | |
|     "CLIPSetLastLayer": CLIPSetLastLayer,
 | |
|     "VAEDecode": VAEDecode,
 | |
|     "VAEEncode": VAEEncode,
 | |
|     "VAEEncodeForInpaint": VAEEncodeForInpaint,
 | |
|     "VAELoader": VAELoader,
 | |
|     "EmptyLatentImage": EmptyLatentImage,
 | |
|     "LatentUpscale": LatentUpscale,
 | |
|     "LatentUpscaleBy": LatentUpscaleBy,
 | |
|     "LatentFromBatch": LatentFromBatch,
 | |
|     "RepeatLatentBatch": RepeatLatentBatch,
 | |
|     "SaveImage": SaveImage,
 | |
|     "PreviewImage": PreviewImage,
 | |
|     "LoadImage": LoadImage,
 | |
|     "LoadImageMask": LoadImageMask,
 | |
|     "ImageScale": ImageScale,
 | |
|     "ImageScaleBy": ImageScaleBy,
 | |
|     "ImageInvert": ImageInvert,
 | |
|     "ImageBatch": ImageBatch,
 | |
|     "ImagePadForOutpaint": ImagePadForOutpaint,
 | |
|     "EmptyImage": EmptyImage,
 | |
|     "ConditioningAverage": ConditioningAverage ,
 | |
|     "ConditioningCombine": ConditioningCombine,
 | |
|     "ConditioningConcat": ConditioningConcat,
 | |
|     "ConditioningSetArea": ConditioningSetArea,
 | |
|     "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
 | |
|     "ConditioningSetMask": ConditioningSetMask,
 | |
|     "KSamplerAdvanced": KSamplerAdvanced,
 | |
|     "SetLatentNoiseMask": SetLatentNoiseMask,
 | |
|     "LatentComposite": LatentComposite,
 | |
|     "LatentBlend": LatentBlend,
 | |
|     "LatentRotate": LatentRotate,
 | |
|     "LatentFlip": LatentFlip,
 | |
|     "LatentCrop": LatentCrop,
 | |
|     "LoraLoader": LoraLoader,
 | |
|     "CLIPLoader": CLIPLoader,
 | |
|     "UNETLoader": UNETLoader,
 | |
|     "DualCLIPLoader": DualCLIPLoader,
 | |
|     "CLIPVisionEncode": CLIPVisionEncode,
 | |
|     "StyleModelApply": StyleModelApply,
 | |
|     "unCLIPConditioning": unCLIPConditioning,
 | |
|     "ControlNetApply": ControlNetApply,
 | |
|     "ControlNetApplyAdvanced": ControlNetApplyAdvanced,
 | |
|     "ControlNetLoader": ControlNetLoader,
 | |
|     "DiffControlNetLoader": DiffControlNetLoader,
 | |
|     "StyleModelLoader": StyleModelLoader,
 | |
|     "CLIPVisionLoader": CLIPVisionLoader,
 | |
|     "VAEDecodeTiled": VAEDecodeTiled,
 | |
|     "VAEEncodeTiled": VAEEncodeTiled,
 | |
|     "unCLIPCheckpointLoader": unCLIPCheckpointLoader,
 | |
|     "GLIGENLoader": GLIGENLoader,
 | |
|     "GLIGENTextBoxApply": GLIGENTextBoxApply,
 | |
| 
 | |
|     "CheckpointLoader": CheckpointLoader,
 | |
|     "DiffusersLoader": DiffusersLoader,
 | |
| 
 | |
|     "LoadLatent": LoadLatent,
 | |
|     "SaveLatent": SaveLatent,
 | |
| 
 | |
|     "ConditioningZeroOut": ConditioningZeroOut,
 | |
|     "ConditioningSetTimestepRange": ConditioningSetTimestepRange,
 | |
| }
 | |
| 
 | |
| NODE_DISPLAY_NAME_MAPPINGS = {
 | |
|     # Sampling
 | |
|     "KSampler": "KSampler",
 | |
|     "KSamplerAdvanced": "KSampler (Advanced)",
 | |
|     # Loaders
 | |
|     "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
 | |
|     "CheckpointLoaderSimple": "Load Checkpoint",
 | |
|     "VAELoader": "Load VAE",
 | |
|     "LoraLoader": "Load LoRA",
 | |
|     "CLIPLoader": "Load CLIP",
 | |
|     "ControlNetLoader": "Load ControlNet Model",
 | |
|     "DiffControlNetLoader": "Load ControlNet Model (diff)",
 | |
|     "StyleModelLoader": "Load Style Model",
 | |
|     "CLIPVisionLoader": "Load CLIP Vision",
 | |
|     "UpscaleModelLoader": "Load Upscale Model",
 | |
|     # Conditioning
 | |
|     "CLIPVisionEncode": "CLIP Vision Encode",
 | |
|     "StyleModelApply": "Apply Style Model",
 | |
|     "CLIPTextEncode": "CLIP Text Encode (Prompt)",
 | |
|     "CLIPSetLastLayer": "CLIP Set Last Layer",
 | |
|     "ConditioningCombine": "Conditioning (Combine)",
 | |
|     "ConditioningAverage ": "Conditioning (Average)",
 | |
|     "ConditioningConcat": "Conditioning (Concat)",
 | |
|     "ConditioningSetArea": "Conditioning (Set Area)",
 | |
|     "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
 | |
|     "ConditioningSetMask": "Conditioning (Set Mask)",
 | |
|     "ControlNetApply": "Apply ControlNet",
 | |
|     "ControlNetApplyAdvanced": "Apply ControlNet (Advanced)",
 | |
|     # Latent
 | |
|     "VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
 | |
|     "SetLatentNoiseMask": "Set Latent Noise Mask",
 | |
|     "VAEDecode": "VAE Decode",
 | |
|     "VAEEncode": "VAE Encode",
 | |
|     "LatentRotate": "Rotate Latent",
 | |
|     "LatentFlip": "Flip Latent",
 | |
|     "LatentCrop": "Crop Latent",
 | |
|     "EmptyLatentImage": "Empty Latent Image",
 | |
|     "LatentUpscale": "Upscale Latent",
 | |
|     "LatentUpscaleBy": "Upscale Latent By",
 | |
|     "LatentComposite": "Latent Composite",
 | |
|     "LatentBlend": "Latent Blend",
 | |
|     "LatentFromBatch" : "Latent From Batch",
 | |
|     "RepeatLatentBatch": "Repeat Latent Batch",
 | |
|     # Image
 | |
|     "SaveImage": "Save Image",
 | |
|     "PreviewImage": "Preview Image",
 | |
|     "LoadImage": "Load Image",
 | |
|     "LoadImageMask": "Load Image (as Mask)",
 | |
|     "ImageScale": "Upscale Image",
 | |
|     "ImageScaleBy": "Upscale Image By",
 | |
|     "ImageUpscaleWithModel": "Upscale Image (using Model)",
 | |
|     "ImageInvert": "Invert Image",
 | |
|     "ImagePadForOutpaint": "Pad Image for Outpainting",
 | |
|     "ImageBatch": "Batch Images",
 | |
|     # _for_testing
 | |
|     "VAEDecodeTiled": "VAE Decode (Tiled)",
 | |
|     "VAEEncodeTiled": "VAE Encode (Tiled)",
 | |
| }
 | |
| 
 | |
| EXTENSION_WEB_DIRS = {}
 | |
| 
 | |
| def load_custom_node(module_path, ignore=set()):
 | |
|     module_name = os.path.basename(module_path)
 | |
|     if os.path.isfile(module_path):
 | |
|         sp = os.path.splitext(module_path)
 | |
|         module_name = sp[0]
 | |
|     try:
 | |
|         if os.path.isfile(module_path):
 | |
|             module_spec = importlib.util.spec_from_file_location(module_name, module_path)
 | |
|             module_dir = os.path.split(module_path)[0]
 | |
|         else:
 | |
|             module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py"))
 | |
|             module_dir = module_path
 | |
| 
 | |
|         module = importlib.util.module_from_spec(module_spec)
 | |
|         sys.modules[module_name] = module
 | |
|         module_spec.loader.exec_module(module)
 | |
| 
 | |
|         if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None:
 | |
|             web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY")))
 | |
|             if os.path.isdir(web_dir):
 | |
|                 EXTENSION_WEB_DIRS[module_name] = web_dir
 | |
| 
 | |
|         if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None:
 | |
|             for name in module.NODE_CLASS_MAPPINGS:
 | |
|                 if name not in ignore:
 | |
|                     NODE_CLASS_MAPPINGS[name] = module.NODE_CLASS_MAPPINGS[name]
 | |
|             if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None:
 | |
|                 NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS)
 | |
|             return True
 | |
|         else:
 | |
|             print(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.")
 | |
|             return False
 | |
|     except Exception as e:
 | |
|         print(traceback.format_exc())
 | |
|         print(f"Cannot import {module_path} module for custom nodes:", e)
 | |
|         return False
 | |
| 
 | |
| def load_custom_nodes():
 | |
|     base_node_names = set(NODE_CLASS_MAPPINGS.keys())
 | |
|     node_paths = folder_paths.get_folder_paths("custom_nodes")
 | |
|     node_import_times = []
 | |
|     for custom_node_path in node_paths:
 | |
|         possible_modules = os.listdir(custom_node_path)
 | |
|         if "__pycache__" in possible_modules:
 | |
|             possible_modules.remove("__pycache__")
 | |
| 
 | |
|         for possible_module in possible_modules:
 | |
|             module_path = os.path.join(custom_node_path, possible_module)
 | |
|             if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue
 | |
|             if module_path.endswith(".disabled"): continue
 | |
|             time_before = time.perf_counter()
 | |
|             success = load_custom_node(module_path, base_node_names)
 | |
|             node_import_times.append((time.perf_counter() - time_before, module_path, success))
 | |
| 
 | |
|     if len(node_import_times) > 0:
 | |
|         print("\nImport times for custom nodes:")
 | |
|         for n in sorted(node_import_times):
 | |
|             if n[2]:
 | |
|                 import_message = ""
 | |
|             else:
 | |
|                 import_message = " (IMPORT FAILED)"
 | |
|             print("{:6.1f} seconds{}:".format(n[0], import_message), n[1])
 | |
|         print()
 | |
| 
 | |
| def init_custom_nodes():
 | |
|     extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "fcbh_extras")
 | |
|     extras_files = [
 | |
|         "nodes_latent.py",
 | |
|         "nodes_hypernetwork.py",
 | |
|         "nodes_upscale_model.py",
 | |
|         "nodes_post_processing.py",
 | |
|         "nodes_mask.py",
 | |
|         "nodes_compositing.py",
 | |
|         "nodes_rebatch.py",
 | |
|         "nodes_model_merging.py",
 | |
|         "nodes_tomesd.py",
 | |
|         "nodes_clip_sdxl.py",
 | |
|         "nodes_canny.py",
 | |
|         "nodes_freelunch.py",
 | |
|         "nodes_custom_sampler.py",
 | |
|         "nodes_hypertile.py",
 | |
|     ]
 | |
| 
 | |
|     for node_file in extras_files:
 | |
|         load_custom_node(os.path.join(extras_dir, node_file))
 | |
| 
 | |
|     load_custom_nodes()
 |