211 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			211 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import torch
 | |
| from fcbh.ldm.modules.diffusionmodules.openaimodel import UNetModel
 | |
| from fcbh.ldm.modules.encoders.noise_aug_modules import CLIPEmbeddingNoiseAugmentation
 | |
| from fcbh.ldm.modules.diffusionmodules.util import make_beta_schedule
 | |
| from fcbh.ldm.modules.diffusionmodules.openaimodel import Timestep
 | |
| import fcbh.model_management
 | |
| import numpy as np
 | |
| from enum import Enum
 | |
| from . import utils
 | |
| 
 | |
| class ModelType(Enum):
 | |
|     EPS = 1
 | |
|     V_PREDICTION = 2
 | |
| 
 | |
| class BaseModel(torch.nn.Module):
 | |
|     def __init__(self, model_config, model_type=ModelType.EPS, device=None):
 | |
|         super().__init__()
 | |
| 
 | |
|         unet_config = model_config.unet_config
 | |
|         self.latent_format = model_config.latent_format
 | |
|         self.model_config = model_config
 | |
|         self.register_schedule(given_betas=None, beta_schedule=model_config.beta_schedule, timesteps=1000, linear_start=0.00085, linear_end=0.012, cosine_s=8e-3)
 | |
|         if not unet_config.get("disable_unet_model_creation", False):
 | |
|             self.diffusion_model = UNetModel(**unet_config, device=device)
 | |
|         self.model_type = model_type
 | |
|         self.adm_channels = unet_config.get("adm_in_channels", None)
 | |
|         if self.adm_channels is None:
 | |
|             self.adm_channels = 0
 | |
|         print("model_type", model_type.name)
 | |
|         print("adm", self.adm_channels)
 | |
| 
 | |
|     def register_schedule(self, given_betas=None, beta_schedule="linear", timesteps=1000,
 | |
|                           linear_start=1e-4, linear_end=2e-2, cosine_s=8e-3):
 | |
|         if given_betas is not None:
 | |
|             betas = given_betas
 | |
|         else:
 | |
|             betas = make_beta_schedule(beta_schedule, timesteps, linear_start=linear_start, linear_end=linear_end, cosine_s=cosine_s)
 | |
|         alphas = 1. - betas
 | |
|         alphas_cumprod = np.cumprod(alphas, axis=0)
 | |
|         alphas_cumprod_prev = np.append(1., alphas_cumprod[:-1])
 | |
| 
 | |
|         timesteps, = betas.shape
 | |
|         self.num_timesteps = int(timesteps)
 | |
|         self.linear_start = linear_start
 | |
|         self.linear_end = linear_end
 | |
| 
 | |
|         self.register_buffer('betas', torch.tensor(betas, dtype=torch.float32))
 | |
|         self.register_buffer('alphas_cumprod', torch.tensor(alphas_cumprod, dtype=torch.float32))
 | |
|         self.register_buffer('alphas_cumprod_prev', torch.tensor(alphas_cumprod_prev, dtype=torch.float32))
 | |
| 
 | |
|     def apply_model(self, x, t, c_concat=None, c_crossattn=None, c_adm=None, control=None, transformer_options={}):
 | |
|         if c_concat is not None:
 | |
|             xc = torch.cat([x] + [c_concat], dim=1)
 | |
|         else:
 | |
|             xc = x
 | |
|         context = c_crossattn
 | |
|         dtype = self.get_dtype()
 | |
|         xc = xc.to(dtype)
 | |
|         t = t.to(dtype)
 | |
|         context = context.to(dtype)
 | |
|         if c_adm is not None:
 | |
|             c_adm = c_adm.to(dtype)
 | |
|         return self.diffusion_model(xc, t, context=context, y=c_adm, control=control, transformer_options=transformer_options).float()
 | |
| 
 | |
|     def get_dtype(self):
 | |
|         return self.diffusion_model.dtype
 | |
| 
 | |
|     def is_adm(self):
 | |
|         return self.adm_channels > 0
 | |
| 
 | |
|     def encode_adm(self, **kwargs):
 | |
|         return None
 | |
| 
 | |
|     def load_model_weights(self, sd, unet_prefix=""):
 | |
|         to_load = {}
 | |
|         keys = list(sd.keys())
 | |
|         for k in keys:
 | |
|             if k.startswith(unet_prefix):
 | |
|                 to_load[k[len(unet_prefix):]] = sd.pop(k)
 | |
| 
 | |
|         m, u = self.diffusion_model.load_state_dict(to_load, strict=False)
 | |
|         if len(m) > 0:
 | |
|             print("unet missing:", m)
 | |
| 
 | |
|         if len(u) > 0:
 | |
|             print("unet unexpected:", u)
 | |
|         del to_load
 | |
|         return self
 | |
| 
 | |
|     def process_latent_in(self, latent):
 | |
|         return self.latent_format.process_in(latent)
 | |
| 
 | |
|     def process_latent_out(self, latent):
 | |
|         return self.latent_format.process_out(latent)
 | |
| 
 | |
|     def state_dict_for_saving(self, clip_state_dict, vae_state_dict):
 | |
|         clip_state_dict = self.model_config.process_clip_state_dict_for_saving(clip_state_dict)
 | |
|         unet_sd = self.diffusion_model.state_dict()
 | |
|         unet_state_dict = {}
 | |
|         for k in unet_sd:
 | |
|             unet_state_dict[k] = fcbh.model_management.resolve_lowvram_weight(unet_sd[k], self.diffusion_model, k)
 | |
| 
 | |
|         unet_state_dict = self.model_config.process_unet_state_dict_for_saving(unet_state_dict)
 | |
|         vae_state_dict = self.model_config.process_vae_state_dict_for_saving(vae_state_dict)
 | |
|         if self.get_dtype() == torch.float16:
 | |
|             clip_state_dict = utils.convert_sd_to(clip_state_dict, torch.float16)
 | |
|             vae_state_dict = utils.convert_sd_to(vae_state_dict, torch.float16)
 | |
| 
 | |
|         if self.model_type == ModelType.V_PREDICTION:
 | |
|             unet_state_dict["v_pred"] = torch.tensor([])
 | |
| 
 | |
|         return {**unet_state_dict, **vae_state_dict, **clip_state_dict}
 | |
| 
 | |
|     def set_inpaint(self):
 | |
|         self.concat_keys = ("mask", "masked_image")
 | |
| 
 | |
| def unclip_adm(unclip_conditioning, device, noise_augmentor, noise_augment_merge=0.0):
 | |
|     adm_inputs = []
 | |
|     weights = []
 | |
|     noise_aug = []
 | |
|     for unclip_cond in unclip_conditioning:
 | |
|         for adm_cond in unclip_cond["clip_vision_output"].image_embeds:
 | |
|             weight = unclip_cond["strength"]
 | |
|             noise_augment = unclip_cond["noise_augmentation"]
 | |
|             noise_level = round((noise_augmentor.max_noise_level - 1) * noise_augment)
 | |
|             c_adm, noise_level_emb = noise_augmentor(adm_cond.to(device), noise_level=torch.tensor([noise_level], device=device))
 | |
|             adm_out = torch.cat((c_adm, noise_level_emb), 1) * weight
 | |
|             weights.append(weight)
 | |
|             noise_aug.append(noise_augment)
 | |
|             adm_inputs.append(adm_out)
 | |
| 
 | |
|     if len(noise_aug) > 1:
 | |
|         adm_out = torch.stack(adm_inputs).sum(0)
 | |
|         noise_augment = noise_augment_merge
 | |
|         noise_level = round((noise_augmentor.max_noise_level - 1) * noise_augment)
 | |
|         c_adm, noise_level_emb = noise_augmentor(adm_out[:, :noise_augmentor.time_embed.dim], noise_level=torch.tensor([noise_level], device=device))
 | |
|         adm_out = torch.cat((c_adm, noise_level_emb), 1)
 | |
| 
 | |
|     return adm_out
 | |
| 
 | |
| class SD21UNCLIP(BaseModel):
 | |
|     def __init__(self, model_config, noise_aug_config, model_type=ModelType.V_PREDICTION, device=None):
 | |
|         super().__init__(model_config, model_type, device=device)
 | |
|         self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**noise_aug_config)
 | |
| 
 | |
|     def encode_adm(self, **kwargs):
 | |
|         unclip_conditioning = kwargs.get("unclip_conditioning", None)
 | |
|         device = kwargs["device"]
 | |
|         if unclip_conditioning is None:
 | |
|             return torch.zeros((1, self.adm_channels))
 | |
|         else:
 | |
|             return unclip_adm(unclip_conditioning, device, self.noise_augmentor, kwargs.get("unclip_noise_augment_merge", 0.05))
 | |
| 
 | |
| def sdxl_pooled(args, noise_augmentor):
 | |
|     if "unclip_conditioning" in args:
 | |
|         return unclip_adm(args.get("unclip_conditioning", None), args["device"], noise_augmentor)[:,:1280]
 | |
|     else:
 | |
|         return args["pooled_output"]
 | |
| 
 | |
| class SDXLRefiner(BaseModel):
 | |
|     def __init__(self, model_config, model_type=ModelType.EPS, device=None):
 | |
|         super().__init__(model_config, model_type, device=device)
 | |
|         self.embedder = Timestep(256)
 | |
|         self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**{"noise_schedule_config": {"timesteps": 1000, "beta_schedule": "squaredcos_cap_v2"}, "timestep_dim": 1280})
 | |
| 
 | |
|     def encode_adm(self, **kwargs):
 | |
|         clip_pooled = sdxl_pooled(kwargs, self.noise_augmentor)
 | |
|         width = kwargs.get("width", 768)
 | |
|         height = kwargs.get("height", 768)
 | |
|         crop_w = kwargs.get("crop_w", 0)
 | |
|         crop_h = kwargs.get("crop_h", 0)
 | |
| 
 | |
|         if kwargs.get("prompt_type", "") == "negative":
 | |
|             aesthetic_score = kwargs.get("aesthetic_score", 2.5)
 | |
|         else:
 | |
|             aesthetic_score = kwargs.get("aesthetic_score", 6)
 | |
| 
 | |
|         out = []
 | |
|         out.append(self.embedder(torch.Tensor([height])))
 | |
|         out.append(self.embedder(torch.Tensor([width])))
 | |
|         out.append(self.embedder(torch.Tensor([crop_h])))
 | |
|         out.append(self.embedder(torch.Tensor([crop_w])))
 | |
|         out.append(self.embedder(torch.Tensor([aesthetic_score])))
 | |
|         flat = torch.flatten(torch.cat(out)).unsqueeze(dim=0).repeat(clip_pooled.shape[0], 1)
 | |
|         return torch.cat((clip_pooled.to(flat.device), flat), dim=1)
 | |
| 
 | |
| class SDXL(BaseModel):
 | |
|     def __init__(self, model_config, model_type=ModelType.EPS, device=None):
 | |
|         super().__init__(model_config, model_type, device=device)
 | |
|         self.embedder = Timestep(256)
 | |
|         self.noise_augmentor = CLIPEmbeddingNoiseAugmentation(**{"noise_schedule_config": {"timesteps": 1000, "beta_schedule": "squaredcos_cap_v2"}, "timestep_dim": 1280})
 | |
| 
 | |
|     def encode_adm(self, **kwargs):
 | |
|         clip_pooled = sdxl_pooled(kwargs, self.noise_augmentor)
 | |
|         width = kwargs.get("width", 768)
 | |
|         height = kwargs.get("height", 768)
 | |
|         crop_w = kwargs.get("crop_w", 0)
 | |
|         crop_h = kwargs.get("crop_h", 0)
 | |
|         target_width = kwargs.get("target_width", width)
 | |
|         target_height = kwargs.get("target_height", height)
 | |
| 
 | |
|         out = []
 | |
|         out.append(self.embedder(torch.Tensor([height])))
 | |
|         out.append(self.embedder(torch.Tensor([width])))
 | |
|         out.append(self.embedder(torch.Tensor([crop_h])))
 | |
|         out.append(self.embedder(torch.Tensor([crop_w])))
 | |
|         out.append(self.embedder(torch.Tensor([target_height])))
 | |
|         out.append(self.embedder(torch.Tensor([target_width])))
 | |
|         flat = torch.flatten(torch.cat(out)).unsqueeze(dim=0).repeat(clip_pooled.shape[0], 1)
 | |
|         return torch.cat((clip_pooled.to(flat.device), flat), dim=1)
 |