""" First release: January 16 2022 Name: cuesplitter.py Porpose: FFmpeg based audio splitter for Cue sheet files Platform: MacOs, Gnu/Linux, FreeBSD Writer: jeanslack license: GPL3 Rev: February 06 2022 Code checker: flake8 and pylint #################################################################### This file is part of FFcuesplitter. FFcuesplitter is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. FFcuesplitter is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with FFcuesplitter. If not, see . """ import os import shutil import tempfile import chardet from deflacue.deflacue import CueParser from ffcuesplitter.str_utils import msgdebug, msg from ffcuesplitter.exceptions import (InvalidFileError, FFCueSplitterError, ) from ffcuesplitter.ffprobe import ffprobe from ffcuesplitter.ffmpeg import FFMpeg class FFCueSplitter(FFMpeg): """ This class implements an interface for retrieve the required data to accurately split audio CD images using FFmpeg. Usage: >>> from ffcuesplitter.cuesplitter import FFCueSplitter Splittings: >>> split = FFCueSplitter('/home/user/my_file.cue') >>> split.open_cuefile() >>> split.do_operations() Get data tracks: >>> data = FFCueSplitter('/home/user/other.cue', dry=True) >>> data.open_cuefile() >>> data.audiotracks # trackdata >>> data.cue.meta.data # cd_info >>> data.ffmpeg_arguments() Only processing the track three: >>> myfile = FFCueSplitter('/home/user/my_file.cue', progress_meter='tqdm') >>> f.open_cuefile() >>> f.kwargs['tempdir'] = '/tmp/mytempdir' >>> f.ffmpeg_arguments() >>> f.processing(myfile.arguments[2], myfile.seconds[2]) >>> f.move_files_to_outputdir() For a full meaning of the arguments to pass to the instance, read the __init__ docstring of this class. """ def __init__(self, filename, outputdir=str('.'), suffix=str('flac'), overwrite=str('ask'), ffmpeg_cmd=str('ffmpeg'), ffmpeg_loglevel=str('info'), ffprobe_cmd=str('ffprobe'), ffmpeg_add_params=str(''), progress_meter=str('standard'), dry=bool(False) ): """ ------------------ Arguments meaning: ------------------ filename: absolute or relative CUE sheet file outputdir: absolute or relative pathname to output files suffix: output format, one of ("wav", "flac", "mp3", "ogg") . overwrite: overwriting options, one of "ask", "never", "always". ffmpeg_cmd: an absolute path command of ffmpeg ffmpeg_loglevel: one of "error", "warning", "info", "verbose", "debug" . ffprobe_cmd: an absolute path command of ffprobe. ffmpeg_add_params: additionals parameters of FFmpeg. progress_meter: one of 'tqdm', 'standard', default is 'standard'. dry: with `True`, perform the dry run with no changes done to filesystem. """ super().__init__() self.kwargs = {'filename': os.path.abspath(filename)} self.kwargs['dirname'] = os.path.dirname(self.kwargs['filename']) if outputdir == '.': self.kwargs['outputdir'] = self.kwargs['dirname'] else: self.kwargs['outputdir'] = os.path.abspath(outputdir) self.kwargs['format'] = suffix self.kwargs['overwrite'] = overwrite self.kwargs['ffmpeg_cmd'] = ffmpeg_cmd self.kwargs['ffmpeg_loglevel'] = ffmpeg_loglevel self.kwargs['ffprobe_cmd'] = ffprobe_cmd self.kwargs['ffmpeg_add_params'] = ffmpeg_add_params self.kwargs['progress_meter'] = progress_meter self.kwargs['dry'] = dry self.kwargs['logtofile'] = os.path.join(self.kwargs['dirname'], 'ffcuesplitter.log') self.kwargs['tempdir'] = '.' self.audiotracks = None self.probedata = [] self.cue_encoding = None # data chardet self.cue = None self.testpatch = None # set for test cases only # ----------------------------------------------------------------# def move_files_to_outputdir(self): """ All files are processed in a /temp folder. After the split operation is complete, all tracks are moved from /temp folder to output folder. Here evaluates what to do if files already exists on output folder. Raises: FFCueSplitterError Returns: None """ outputdir = self.kwargs['outputdir'] overwr = self.kwargs['overwrite'] for track in os.listdir(self.kwargs['tempdir']): if os.path.exists(os.path.join(outputdir, track)): if overwr in ('n', 'N', 'y', 'Y', 'ask'): while True: msgdebug(warn=f"File already exists: " f"'{os.path.join(outputdir, track)}'") overwr = input("Overwrite [Y/n/always/never]? > ") if overwr in ('Y', 'y', 'n', 'N', 'always', 'never'): break msgdebug(err=f"Invalid option '{overwr}'") continue if overwr == 'never': msgdebug(info=("Do not overwrite any files because " "you specified 'never' option")) return None if overwr in ('y', 'Y', 'always', 'never', 'ask'): if overwr == 'always': msgdebug(info=("Overwrite existing file because " "you specified the 'always' option")) try: shutil.move(os.path.join(self.kwargs['tempdir'], track), os.path.join(outputdir, track)) except Exception as error: raise FFCueSplitterError(error) from error return None # ----------------------------------------------------------------# def do_operations(self): """ Automates the work in a temporary context using tempfile. Raises: FFCueSplitterError Returns: None """ with tempfile.TemporaryDirectory(suffix=None, prefix='ffcuesplitter_', dir=None) as tmpdir: self.kwargs['tempdir'] = tmpdir self.ffmpeg_arguments() msgdebug(info=(f"Temporary Target: '{self.kwargs['tempdir']}'")) count = 0 msgdebug(info="Extracting audio tracks (type Ctrl+c to stop):") for args, secs, title in zip(self.arguments, self.seconds, self.audiotracks): count += 1 msg(f'\nTRACK {count}/{len(self.audiotracks)} ' f'>> "{title["TITLE"]}.{self.outsuffix}" ...') self.processing(args, secs) if self.kwargs['dry'] is True: return msg('\n') msgdebug(info="...done exctracting") msgdebug(info="Move files to: ", tail=(f"\033[34m" f"'{os.path.abspath(self.kwargs['outputdir'])}'" f"\033[0m")) try: os.makedirs(self.kwargs['outputdir'], mode=0o777, exist_ok=True) except Exception as error: raise FFCueSplitterError(error) from error self.move_files_to_outputdir() # ----------------------------------------------------------------# def get_track_duration(self, tracks): """ Gets total duration of the source audio tracks for chunks calculation on the progress meter during ffmpeg executions. Given a total duration calculates the remains duration for the last track as well. This method is called by `cuefile_parser` method, Do not call this method directly. Raises: FFCueSplitterError Returns: tracks (list), all track data taken from the cue file. """ if self.testpatch: probe = {'format': {'duration': 6.000000}} else: filename = tracks[0].get('FILE') cmd = self.kwargs['ffprobe_cmd'] kwargs = {'loglevel': 'error', 'hide_banner': None} probe = ffprobe(filename, cmd=cmd, **kwargs) self.probedata.append(probe) time = [] for idx in enumerate(tracks): if idx[0] != len(tracks) - 1: # minus last trk = (tracks[idx[0] + 1]['START'] - tracks[idx[0]]['START']) / (44100) time.append(trk) if not time: last = (float(probe['format']['duration']) - tracks[0]['START'] / 44100) else: last = float(probe['format']['duration']) - sum(time) time.append(last) for keydur, remain in zip(tracks, time): keydur['DURATION'] = remain return tracks # ----------------------------------------------------------------# def deflacue_object_handler(self): """ Handles `deflacue.CueParser` data. Raises: FFCueSplitterError: if no source audio file found Returns: 'audiotracks' list object """ self.audiotracks = [] cd_info = self.cue.meta.data def sanitize(val: str) -> str: return val.replace('/', '').replace('\\','').replace('"', '') tracks = self.cue.tracks sourcenames = {k: [] for k in [str(x.file.path) for x in tracks]} for track in enumerate(tracks): track_file = track[1].file.path if not track_file.exists(): msgdebug(warn=(f'Source file `{track_file}` is not ' f'found. Track is skipped.')) if str(track_file) in sourcenames: sourcenames.pop(str(track_file)) if not sourcenames: raise FFCueSplitterError('No audio source files ' 'found!') continue filename = (f"{sanitize(track[1].title)}") data = {'FILE': str(track_file), **cd_info, **track[1].data} data['TITLE'] = filename data['START'] = track[1].start if track[1].end != 0: data['END'] = track[1].end if f"{data['FILE']}" in sourcenames.keys(): sourcenames[f'{data["FILE"]}'].append(data) for val in sourcenames.values(): self.audiotracks += self.get_track_duration(val) return self.audiotracks # ----------------------------------------------------------------# def check_cuefile(self): """ Cue file check """ filesuffix = os.path.splitext(self.kwargs['filename'])[1] isfile = os.path.isfile(self.kwargs['filename']) if not isfile or filesuffix not in ('.cue', '.CUE'): raise InvalidFileError(f"Invalid CUE sheet file: " f"'{self.kwargs['filename']}'") # ----------------------------------------------------------------# def open_cuefile(self, testpatch=None): """ Read cue file and start file parsing via deflacue package """ if testpatch: self.testpatch = True self.check_cuefile() curdir = os.getcwd() os.chdir(self.kwargs['dirname']) with open(self.kwargs['filename'], 'rb') as file: cuebyte = file.read() self.cue_encoding = chardet.detect(cuebyte) parser = CueParser.from_file(self.kwargs['filename'], encoding=self.cue_encoding['encoding']) self.cue = parser.run() self.deflacue_object_handler() os.chdir(curdir)