Open-Sora/opensora/utils/config_utils.py
2024-04-30 06:19:40 +00:00

180 lines
7.2 KiB
Python

import argparse
import json
import os
from glob import glob
from mmengine.config import Config
from torch.utils.tensorboard import SummaryWriter
def load_prompts(prompt_path):
with open(prompt_path, "r") as f:
prompts = [line.strip() for line in f.readlines()]
return prompts
def parse_args(training=False):
parser = argparse.ArgumentParser()
# model config
parser.add_argument("config", help="model config file path")
# ======================================================
# General
# ======================================================
parser.add_argument("--seed", default=42, type=int, help="generation seed")
parser.add_argument("--ckpt-path", type=str, help="path to model ckpt; will overwrite cfg.ckpt_path if specified")
parser.add_argument("--batch-size", default=None, type=int, help="batch size")
parser.add_argument("--outputs", default=None, type=str, help="the dir to save model weights")
# ======================================================
# Inference
# ======================================================
if not training:
# output
parser.add_argument("--save-dir", default=None, type=str, help="path to save generated samples")
parser.add_argument("--sample-name", default=None, type=str, help="sample name, default is sample_idx")
parser.add_argument("--start-index", default=None, type=int, help="start index for sample name")
parser.add_argument("--end-index", default=None, type=int, help="end index for sample name")
parser.add_argument("--num-sample", default=None, type=int, help="number of samples to generate for one prompt")
parser.add_argument("--prompt-as-path", action="store_true", help="use prompt as path to save samples")
parser.add_argument("--data-path", default=None, type=str, help="path to data csv")
# prompt
parser.add_argument("--prompt-path", default=None, type=str, help="path to prompt txt file")
parser.add_argument("--prompt", default=None, type=str, nargs="+", help="prompt list")
# image/video
parser.add_argument("--num-frames", default=None, type=int, help="number of frames")
parser.add_argument("--fps", default=None, type=int, help="fps")
parser.add_argument("--image-size", default=None, type=int, nargs=2, help="image size")
# hyperparameters
parser.add_argument("--num-sampling-steps", default=None, type=int, help="sampling steps")
parser.add_argument("--cfg-scale", default=None, type=float, help="balance between cond & uncond")
# reference
parser.add_argument("--loop", default=None, type=int, help="loop")
parser.add_argument("--condition-frame-length", default=None, type=int, help="condition frame length")
parser.add_argument("--reference-path", default=None, type=str, nargs="+", help="reference path")
parser.add_argument("--mask-strategy", default=None, type=str, nargs="+", help="mask strategy")
# ======================================================
# Training
# ======================================================
else:
parser.add_argument("--wandb", default=None, type=bool, help="enable wandb")
parser.add_argument("--load", default=None, type=str, help="path to continue training")
parser.add_argument("--data-path", default=None, type=str, help="path to data csv")
parser.add_argument("--start-from-scratch", action="store_true", help="start training from scratch")
return parser.parse_args()
def merge_args(cfg, args, training=False):
if args.ckpt_path is not None:
cfg.model["from_pretrained"] = args.ckpt_path
if cfg.get("discriminator") is not None:
cfg.discriminator["from_pretrained"] = args.ckpt_path
args.ckpt_path = None
if args.data_path is not None:
cfg.dataset["data_path"] = args.data_path
args.data_path = None
if not training and args.cfg_scale is not None:
cfg.scheduler["cfg_scale"] = args.cfg_scale
args.cfg_scale = None
if not training and args.num_sampling_steps is not None:
cfg.scheduler["num_sampling_steps"] = args.num_sampling_steps
args.num_sampling_steps = None
for k, v in vars(args).items():
if v is not None:
cfg[k] = v
if not training:
# Inference only
# - Allow not set
if "reference_path" not in cfg:
cfg["reference_path"] = None
if "loop" not in cfg:
cfg["loop"] = 1
if "frame_interval" not in cfg:
cfg["frame_interval"] = 1
if "sample_name" not in cfg:
cfg["sample_name"] = None
if "num_sample" not in cfg:
cfg["num_sample"] = 1
if "prompt_as_path" not in cfg:
cfg["prompt_as_path"] = False
# - Prompt handling
if "prompt" not in cfg or cfg["prompt"] is None:
if ("prompt" not in cfg or cfg["prompt"] is None) and cfg.get("prompt_path", None) is not None:
cfg["prompt"] = load_prompts(cfg["prompt_path"])
if args.start_index is not None and args.end_index is not None:
cfg["prompt"] = cfg["prompt"][args.start_index : args.end_index]
elif args.start_index is not None:
cfg["prompt"] = cfg["prompt"][args.start_index :]
elif args.end_index is not None:
cfg["prompt"] = cfg["prompt"][: args.end_index]
else:
# Training only
# - Allow not set
if "mask_ratios" not in cfg:
cfg["mask_ratios"] = None
if "start_from_scratch" not in cfg:
cfg["start_from_scratch"] = False
if "bucket_config" not in cfg:
cfg["bucket_config"] = None
if "transform_name" not in cfg.dataset:
cfg.dataset["transform_name"] = "center"
if "num_bucket_build_workers" not in cfg:
cfg["num_bucket_build_workers"] = 1
# Both training and inference
if "multi_resolution" not in cfg:
cfg["multi_resolution"] = False
return cfg
def parse_configs(training=False):
args = parse_args(training)
cfg = Config.fromfile(args.config)
cfg = merge_args(cfg, args, training)
return cfg
def create_experiment_workspace(cfg, get_last_workspace=False):
"""
This function creates a folder for experiment tracking.
Args:
args: The parsed arguments.
Returns:
exp_dir: The path to the experiment folder.
"""
# Make outputs folder (holds all experiment subfolders)
os.makedirs(cfg.outputs, exist_ok=True)
experiment_index = len(glob(f"{cfg.outputs}/*"))
if get_last_workspace:
experiment_index -= 1
# Create an experiment folder
model_name = cfg.model["type"].replace("/", "-")
exp_name = f"{experiment_index:03d}-{model_name}"
exp_dir = f"{cfg.outputs}/{exp_name}"
os.makedirs(exp_dir, exist_ok=True)
return exp_name, exp_dir
def save_training_config(cfg, experiment_dir):
with open(f"{experiment_dir}/config.txt", "w") as f:
json.dump(cfg, f, indent=4)
def create_tensorboard_writer(exp_dir):
tensorboard_dir = f"{exp_dir}/tensorboard"
os.makedirs(tensorboard_dir, exist_ok=True)
writer = SummaryWriter(tensorboard_dir)
return writer