#!/usr/bin/env python # # Copyright (C) 2009-2011 W. Trevor King # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see # . """Mirror a tree of mp3/ogg/flac files with Ogg Vorbis versions. Other target formats are also supported. Current conversions: * flac -> ogg * flac -> wav -> mp3 * ogg -> wav -> flac * ogg -> wav -> mp3 * mp3 -> wav -> flac * mp3 -> wav -> ogg External packages required for full functionality: * id3v2_ (`id3v2`) * lame_ (`lame`) * flac_ (`metaflac`) * mpg123_ (`mpg123`) * vorbis_ (`ogg123`, `oggenc`, `vorbiscomment`) .. _id3v2: http://id3v2.sourceforge.net/ .. _lame: http://lame.sourceforge.net .. _flac: http://flac.sourceforge.net .. _mpg123: http://www.mpg123.org/ .. _vorbis: http://www.vorbis.com """ from hashlib import sha256 as _hash import os import os.path import re as _re import shutil from subprocess import Popen, PIPE from tempfile import mkstemp __version__ = '0.2' def invoke(args, stdin=None, expect=(0,)): print ' %s' % args p = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) stdout,stderr = p.communicate(stdin) status = p.wait() assert status in expect, 'invalid status %d from %s' % (status, args) return (status, stdout, stderr) class Converter (object): """Recode audio files from `source_dir` to `target_dir`. `target_extension` sets the target encoding. Notes ----- The `get_` and `set_*_metadata` methods should pass metadata as a `dict` with key/value pairs standardised to match the list of Vorbis comment suggestions_ with lowecase keys. The `date` field should be formatted `YYYY[-MM[-DD]]`. .. _suggestions: http://www.xiph.org/vorbis/doc/v-comment.html """ def __init__(self, source_dir, target_dir, target_extension='ogg', cache_file=None, ignore_function=None): self.source_dir = source_dir self.target_dir = target_dir self._source_extensions = ['flac', 'mp3', 'ogg', 'wav'] self._target_extension = target_extension self._cache_file = cache_file self._cache = self._read_cache() self._ignore_function = ignore_function f,self._tempfile = mkstemp(prefix='mkogg-') def cleanup(self): os.remove(self._tempfile) self._save_cache() def _read_cache(self): cache = {} if self._cache_file == None: return cache try: with open(self._cache_file, 'r') as f: line = f.readline() assert line.startswith('# mkogg cache version:'), line version = line.split(':', 1)[-1].strip() if version != __version__: print 'cache version mismatch: %s != %s' % ( version, __version__) return cache # old cache, ignore contents for line in f: try: key,value = [x.strip() for x in line.split(' -> ')] except ValueError: pass cache[key] = value except IOError: pass return cache def _save_cache(self): if self._cache_file == None: return with open(self._cache_file, 'w') as f: f.write('# mkogg cache version: %s\n' % __version__) for key,value in self._cache.iteritems(): f.write('%s -> %s\n' % (key, value)) def run(self): self._makedirs(self.target_dir) for dirpath,dirnames,filenames in os.walk(self.source_dir): for filename in filenames: root,ext = os.path.splitext(filename) ext = ext.lower() if ext.startswith('.'): ext = ext[1:] if ext not in self._source_extensions: print 'skip', filename, ext continue source_path = os.path.join(dirpath, filename) if (self._ignore_function is not None and self._ignore_function(source_path)): continue rel_path = os.path.relpath(dirpath, self.source_dir) target_path = os.path.join( self.target_dir, rel_path, '%s.%s' % (root, self._target_extension)) target_dir = os.path.dirname(target_path) self._makedirs(target_dir) self._convert(source_path, target_path, ext) def _makedirs(self, target_dir): if not os.path.exists(target_dir): os.makedirs(target_dir) def _convert(self, source, target, ext): cache_key = self._cache_key(source) old_cache_value = self._cache.get(cache_key, None) if (old_cache_value != None and old_cache_value == self._cache_value(target)): print 'cached %s to %s' % (source, target) return print 'convert %s to %s' % (source, target) if ext == self._target_extension: shutil.copy(source, target) return convert = getattr(self, 'convert_%s_to_%s' % (ext, self._target_extension)) convert(source, target) if not getattr(convert, 'handles_metadata', False): get_metadata = getattr(self, 'get_%s_metadata' % ext) metadata = get_metadata(source) set_metadata = getattr(self, 'set_%s_metadata' % self._target_extension) set_metadata(target, metadata) self._cache[cache_key] = self._cache_value(target) def _cache_key(self, source): return repr((self._file_hash(source), self._target_extension)) def _cache_value(self, target): return self._file_hash(target) def _file_hash(self, filename): """ Examples -------- >>> c = Converter(None, None) >>> h = c._file_hash(__file__) >>> len(h) 64 >>> c._file_hash('/highly/unlikely/to/exist') == None True >>> c.cleanup() """ h = _hash() chunk_size = 2**20 # 1 Mb try: with open(filename, 'rb') as f: chunk = ' ' while len(chunk) > 0: chunk = f.read(chunk_size) h.update(chunk) except IOError: return None return str(h.hexdigest()) def _parse_date(self, date): """Parse `date` (`YYYY[-MM[-DD]]`), returning `(year, month, day)`. Examples -------- >>> c = Converter(None, None) >>> c._parse_date('2010') ['2010', None, None] >>> c._parse_date('2010-11') ['2010', '11', None] >>> c._parse_date('2010-11-16') ['2010', '11', '16'] >>> c.cleanup() """ fields = date.split('-') assert len(fields) > 0 and len(fields) <= 3, date fields = fields + [None] * (3 - len(fields)) return fields def _parse_id3v2_comments(self, stdout): """Parse ID3v2 tags. Examples -------- >>> from pprint import pprint >>> c = Converter(None, None) >>> metadata = c._parse_id3v2_comments('\\n'.join([ ... 'id3v1 tag info for src/03-Drive_My_Car.mp3:', ... 'Title : The Famous Song Artist: No One You Know', ... 'Album : The Famous Album Year: 1965, Genre: Rock (17)', ... 'Comment: Track: 7', ... 'id3v2 tag info for src/03-Drive_My_Car.mp3:', ... 'TALB (Album/Movie/Show title): The Famous Album', ... 'TPE1 (Lead performer(s)/Soloist(s)): No One You Know', ... 'TIT2 (Title/songname/content description): The Famous Song', ... 'TYER (Year): 1965', ... 'TCON (Content type): Rock (17)', ... 'TRCK (Track number/Position in set): 07/14'])) >>> pprint(metadata) # doctest: +REPORT_UDIFF {'album': 'The Famous Album', 'artist': 'No One You Know', 'date': '1965', 'genre': 'Rock', 'title': 'The Famous Song', 'tracknumber': '07', 'tracktotal': '14'} >>> c.cleanup() """ metadata = {} vorbis_keys = { 'comm': 'comment', 'talb': 'album', 'tcom': 'composer', 'tcon': 'genre', 'tcop': 'copyright', 'tit2': 'title', 'tpe1': 'artist', 'tpe2': 'accompaniment', 'tpe3': 'conductor', 'tpos': 'part of set', 'tpub': 'organization', 'trck': 'tracknumber', 'tyer': 'date', } drop_keys = [ 'apic', # attached picture 'geob', # general encapsulated object 'ncon', # ? 'pcnt', # play counter (incremented with each play) 'priv', # private 'tco', # content type 'tcp', # frame? 'tenc', # encoded by 'tflt', # file type 'tope', # original artist (e.g. for a cover) 'tlen', # length (in milliseconds) 'tmed', # media type 'wxxx', # user defined URL ] key_translations = { 'com': 'comm', 'ten': 'tenc', 'tal': 'talb', 'tcm': 'tcom', 'tt2': 'tit2', 'tp1': 'tpe1', 'trk': 'trck', 'tye': 'tyer', } in_v2 = False for line in stdout.splitlines(): if not in_v2: if line.startswith('id3v2 tag info'): in_v2 = True continue key,value = [x.strip() for x in line.split(':', 1)] if value.lower() == 'no id3v1 tag': continue short_key = key.split()[0].lower() short_key = key_translations.get(short_key, short_key) if short_key in drop_keys: continue v_key = vorbis_keys[short_key] if v_key == 'genre': value = value.rsplit('(', 1)[0].strip() elif v_key == 'tracknumber' and '/' in value: value,total = value.split('/') metadata['tracktotal'] = total metadata[v_key] = value return metadata def _parse_vorbis_comments(self, stdout): """Parse Vorbis comments. Examples -------- >>> from pprint import pprint >>> c = Converter(None, None) >>> metadata = c._parse_vorbis_comments('\\n'.join([ ... 'ARTIST=No One You Know', ... 'ALBUM=The Famous Album', ... 'TITLE=The Famous Song', ... 'DATE=1965', ... 'GENRE=Rock', ... 'TRACKNUMBER=07', ... 'TRACKTOTAL=14', ... 'CDDB=af08640e'])) >>> pprint(metadata) # doctest: +REPORT_UDIFF {'album': 'The Famous Album', 'artist': 'No One You Know', 'cddb': 'af08640e', 'date': '1965', 'genre': 'Rock', 'title': 'The Famous Song', 'tracknumber': '07', 'tracktotal': '14'} >>> c.cleanup() """ metadata = {} for line in stdout.splitlines(): key,value = line.split('=', 1) metadata[key.lower()] = value return metadata def convert_flac_to_mp3(self, source, target): self.convert_flac_to_wav(source, self._tempfile) self.convert_wav_to_mp3(self._tempfile, target) def convert_flac_to_wav(self, source, target): invoke(['ogg123', '-d', 'wav', '-f', target, source]) def convert_flac_to_ogg(self, source, target): invoke(['oggenc', '--quiet', '-q', '3', source, '-o', target]) convert_flac_to_ogg.handles_metadata = True def convert_mp3_to_flac(self, source, target): self.convert_mp3_to_wav(source, self._tempfile) self.convert_wav_to_flac(self._tempfile, target) def convert_mp3_to_ogg(self, source, target): self.convert_mp3_to_wav(source, self._tempfile) self.convert_wav_to_ogg(self._tempfile, target) def convert_mp3_to_wav(self, source, target): invoke(['mpg123', '-w', target, source]) def convert_ogg_to_mp3(self, source, target): self.convert_flac_to_mp3(source, target) def convert_ogg_to_wav(self, source, target): self.convert_flac_to_wav(source_target) def convert_wav_to_flac(self, source, target): invoke(['flac', '-o', target, source]) def convert_wav_to_mp3(self, source, target): invoke(['lame', '--quiet', '-V', '4', source, target]) def convert_wav_to_ogg(self, source, target): self.convert_flac_to_ogg(source, target) def get_flac_metadata(self, source): status,stdout,stderr = invoke( ['metaflac', '--export-tags-to=-', source]) metadata = {} for line in stdout.splitlines(): key,value = line.split('=', 1) metadata[key.lower()] = value return metadata def get_flac_metadata(self, source): status,stdout,stderr = invoke( ['metaflac', '--export-tags-to=-', source]) return self._parse_vorbis_comments(stdout) def get_mp3_metadata(self, source): status,stdout,stderr = invoke( ['id3v2', '--list', source]) return self._parse_id3v2_comments(stdout) def get_ogg_metadata(self, source): status,stdout,stderr = invoke( ['vorbiscomment', '--list', source]) return self._parse_vorbis_comments(stdout) def get_wav_metadata(self, source): return {} def set_flac_metadata(self, target, metadata): stdin = '\n'.join(['%s=%s' % (k.upper(), v) for k,v in sorted(metadata.iteritems())]) invoke(['metaflac', '--import-tags-from=-', target], stdin=stdin) def set_mp3_metadata(self, target, metadata): args = ['id3v2'] for key,arg in [('album', '--album'), ('artist', '--artist'), ('title', '--song')]: if key in metadata: args.extend([arg, metadata[key]]) if 'date' in metadata: year,month,day = self._parse_date(metadata['date']) args.extend(['--year', year]) if 'genre' in metadata: genre = metadata['genre'] if not hasattr(self, '_id3v1_genres'): status,stdout,stderr = invoke(['id3v2', '--list-genres']) genres = {} for line in stdout.splitlines(): num,name = [x.strip() for x in line.split(':', 1)] genres[name.lower()] = num self._id3v1_genres = genres # Genre 12 = "Other" num = self._id3v1_genres.get(genre.lower(), '12') args.extend(['--genre', num]) if 'tracknumber' in metadata: track = metadata['tracknumber'] if 'tracktotal' in metadata: track = '%s/%s' % (track, metadata['tracktotal']) args.extend(['--track', track]) args.append(target) invoke(args) def set_ogg_metadata(self, target, metadata): stdin = '\n'.join(['%s=%s' % (k.upper(), v) for k,v in sorted(metadata.iteritems())]) invoke(['vorbiscomment', '--write', target], stdin=stdin) def set_wav_metadata(self, target, metadata): pass def test(): import doctest results = doctest.testmod() return results.failed % 127 if __name__ == '__main__': import optparse import sys usage = '%prog [options] source-dir target-dir' epilog = __doc__ p = optparse.OptionParser(usage=usage, epilog=epilog) p.format_epilog = lambda formatter: epilog+'\n' p.add_option('-t', '--target-extension', dest='ext', default='ogg', metavar='EXT', help='Conversion target type (e.g. flac, mp3) (%default)') p.add_option('-c', '--cache', dest='cache', metavar='PATH', help=('Save conversion hashes in a cache file to avoid ' 'repeated previous conversions.')) p.add_option('-i', '--ignore', dest='ignore', metavar='REGEXP', help=('Ignore source paths matching REGEXP.')) p.add_option('--test', dest='test', action='store_true', default=False, help='Run internal tests and exit') options,args = p.parse_args() if options.test: sys.exit(test()) if options.ignore is not None: ignore_regexp = _re.compile(options.ignore) ignore_function = ignore_regexp.match else: ignore_function = None source_dir,target_dir = args c = Converter(source_dir, target_dir, target_extension=options.ext, cache_file=options.cache, ignore_function=ignore_function) try: c.run() finally: c.cleanup() pass