|
|
CUE_SHEET_EXTENSION: str = ".cue"
import os import shutil import logging import subprocess import sys import re
from joblib import Parallel, delayed from configparser import ConfigParser from copy import deepcopy from pathlib import Path from typing import List, Optional, Tuple from ffcuesplitter.cuesplitter import FFCueSplitter
CONFIG_FILE: str = sys.argv[1]
config = ConfigParser() config.read(CONFIG_FILE) SRC_FOLDER: str = config.get("source", "folder") SRC_FILE_EXTS: List[str] = config.get("source", "file_exts").split(",") SRC_ICON_EXTS: List[str] = config.get("source", "icon_exts").split(",") DST_FOLDER: str = config.get("destination", "folder") DST_FILE_EXT: str = config.get("destination", "file_ext") DST_OVERWRITE: bool = config.get("libconv", "overwrite") == "true" DRY_RUN: bool = config.get("libconv", "dry_run") != "false" DEDUCE_METADATA: bool = config.get("libconv", "deduce_metadata") == "true" FFMPEG_LOCATION: str = config.get("ffmpeg", "location") FFMPEG_OPTS: str = config.get("ffmpeg", "options") THREADS: int = int(config.get("libconv", "threads"))
if DST_OVERWRITE: FFMPEG_OPTS += " -y"
logging.basicConfig(level=logging.INFO)
cmd_list: List[str] = []
# ----------------------------------------------------------------- # # file and folder operations
def copy(src: str, dst: str): logging.debug(f"copy {src} to {dst}") if not DRY_RUN: shutil.copyfile(src, dst, follow_symlinks=True)
def mkdir(path: str): logging.debug(f"mkdir -r {path}") if not DRY_RUN: os.makedirs(path, exist_ok=True)
def execute(command: str): logging.debug(f"execute: {command}") if not DRY_RUN: cmd_list.append(command)
# ----------------------------------------------------------------- # # cue sheet processing
def cue_sheet_processor(path: str): dst_folder, preexisting = prepare_destination(path)
if preexisting and not DST_OVERWRITE: logging.info(f"Album {path} already exists in output location and overwrite is disabled - skipping") return
sheet = get_files_with_ext(path, [CUE_SHEET_EXTENSION]) args = []
if (len(sheet) != 1): logging.error(f"Expected exactly one but {path} contains {len(sheet)} cue sheets - trying file processor") file_processor(path) return
try: data = FFCueSplitter(sheet[0], dry=DRY_RUN, outputdir=dst_folder, suffix=DST_FILE_EXT, overwrite=DST_OVERWRITE, ffmpeg_cmd=FFMPEG_LOCATION, ffmpeg_add_params=FFMPEG_OPTS) data.open_cuefile() args = data.ffmpeg_arguments()["arguments"] except: logging.error(f"Error during parsing of cuesheet {path} - trying file processor") file_processor(path) return
for arg in args: arg_end = arg.find("-y") filename = arg[arg_end + 5:-1] arg = arg[:arg_end] execute(f"{arg} \"{dst_folder}{filename}\"")
# ----------------------------------------------------------------- # # basic "folder contains audio files" processing
def metadata_from_folder(path: str) -> str: # this method has to be adapted to your individual folder structure # if there is none then do nothing as by default this is not used if not DEDUCE_METADATA: return ""
path = os.path.normpath(path) folders = path.split(os.sep)
logging.debug(f"deducing metadata from folders: {folders}")
album_name = re.sub(r'[ ]*\(.*?\)', '', folders[-1]) comment = folders[-1].replace(album_name, "").replace("(", "").replace(")", "").replace(" ","")
logging.debug(f"got metadata: album={album_name} comment={comment} artist={folders[-2]} genre={folders[-3]}") return f"-metadata ALBUM=\"{album_name}\" -metadata COMMENT=\"{comment}\" -metadata ARTIST=\"{folders[-2]}\" -metadata GENRE=\"{folders[-3]}\""
def metadata_from_file(filename: str) -> str: title="" number=""
if re.match(r'[0-9]+[ ]*-[ ]*[.]*', filename): logging.debug(f"file {filename} matched metadata regex #1") number = re.search(r'[0-9]+[ ]*-[ ]*', filename).group() title = filename.replace(number, "") number = re.sub(r'[ ]*-[ ]*', '', number) elif re.match(r'[0-9]+\.[ ]*[.]*', filename): logging.debug(f"file {filename} matched metadata regex #2") number = re.search(r'[0-9]+\.[ ]*', filename).group() title = filename.replace(number, "") number = re.sub(r'\.[ ]*', "", number) else: logging.debug(f"file {filename} matched no metadata regex")
return f"-metadata TITLE=\"{title}\" -metadata TRACK=\"{number}\""
def file_processor(path: str): dst_folder, preexisting = prepare_destination(path)
if preexisting and not DST_OVERWRITE: logging.info(f"Album {path} already exists in output location and overwrite is disabled - skipping") return
files = get_files_with_ext(path, SRC_FILE_EXTS) album_metadata = metadata_from_folder(path)
for file in files: filename = get_filename(file, path) file_metadata = metadata_from_file(filename[1:]) execute(f"\"{FFMPEG_LOCATION}\" -i \"{file}\" {FFMPEG_OPTS} {album_metadata} {file_metadata} \"{dst_folder + filename}.{DST_FILE_EXT}\"")
# ----------------------------------------------------------------- # # Iteration over library folders and preparation
def contains_extension(path: str, extensions: List[str]) -> bool: logging.debug(f"searching for extensions {extensions} in {path}") for root, dirs, files in os.walk(path): for file in files: for ext in extensions: if file.endswith(ext): logging.debug(f"file {file} matched extension '{ext}'") return True # needed to prevent recursive search break return False
def get_files_with_ext(path: str, extensions: List[str]) -> List[str]: logging.debug(f"obtainint all files with extensions {extensions} from {path}") ret_files = [] for root, dirs, files in os.walk(path): for file in files: for ext in extensions: if file.endswith(ext): logging.debug(f"file {file} matched extension '{ext}'") ret_files.append(os.path.join(root, file)) # file is added now - no need to check the other extensions break # needed to prevent recursive search break return ret_files
def prepare_destination(path: str) -> Tuple[str,bool]: images = get_files_with_ext(path, SRC_ICON_EXTS)
sub_path = path.replace(SRC_FOLDER, "") new_path = DST_FOLDER + sub_path mkdir(new_path)
existing: bool = len(os.listdir(new_path)) != 0
for img in images: img_name = img.replace(path, "") new_img_path = new_path + img_name copy(img, new_img_path)
return (new_path,existing) def get_filename(file: str, path: str) -> str: fname = file.replace(path, "") for ext in SRC_FILE_EXTS: fname = fname.replace("." + ext, "") return fname
def scan_folder(path: str): for root, dirs, files in os.walk(path): for directory in dirs: process_folder(os.path.join(root, directory)) # needed to prevent recursive search break
def process_folder(path: str): logging.info(f"Visiting folder '{path}'") if contains_extension(path, [CUE_SHEET_EXTENSION]): logging.info("found cuesheet") cue_sheet_processor(path) elif contains_extension(path, SRC_FILE_EXTS): logging.info("found audio files") file_processor(path) else: logging.info("scanning subfolders") scan_folder(path)
def execute_command_list(): logging.info(f"Executing all {len(cmd_list)} commands with {THREADS} parallel threads")
if DRY_RUN: logging.info("Skipping execution as we are dry-running. Printing list as debug-info.") logging.debug(str(cmd_list)) return
Parallel(n_jobs=THREADS)( delayed(os.system)(cmd) for cmd in cmd_list )
if __name__ == "__main__": logging.info(f"Using settings from {CONFIG_FILE}")
if DRY_RUN: logging.info("Executing as DRY RUN") if DST_OVERWRITE: logging.info("Overwrite of existing files active")
process_folder(SRC_FOLDER) execute_command_list()
|