Mercurial > ~darius > hgwebdir.cgi > amakode
view amakode.py @ 8:e2ef50b914fd default tip
Actually we need Python 2.4 for subprocess.
author | darius@inchoate.localdomain |
---|---|
date | Mon, 19 Nov 2007 17:25:28 +1030 |
parents | bdb8597191dc |
children |
line wrap: on
line source
#!/usr/bin/env python ############################################################################ # Transcoder for Amarok # - Add support for tagging (jens.zurheide@gmx.de) # - Fixed typo in lame encoder (tcuya from kde-apps.org) # - Made setting maxjobs easier, although Amarok doesn't appear to issue # multiple requests :( # # Depends on: Python 2.4 # tagpy (optional) # # The only user servicable parts are the encode/decode (line 103) and the # number of concurrent jobs to run (line 225) # # The optional module tagpy (http://news.tiker.net/software/tagpy) is used # for tag information processing. This allows for writing tags into the # transcoded files. # # Mercurial repo available at http://www.dons.net.au/~darius/hgwebdir.cgi/amakode/ # ############################################################################ # # Copyright (C) 2007 Daniel O'Connor. All rights reserved. # Copyright (C) 2007 Jens Zurheide. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # ############################################################################ __version__ = "1.3" import ConfigParser import os import sys import string import signal import logging import select import subprocess import tempfile from logging.handlers import RotatingFileHandler import urllib import urlparse import re try: import tagpy have_tagpy = True except ImportError, e: have_tagpy = False class tagpywrap(dict): textfields = ['album', 'artist', 'title', 'comment', 'genre'] numfields = ['year', 'track'] allfields = textfields + numfields def __init__(self, url): f = urllib.urlopen(url) self.tagInfo = tagpy.FileRef(f.fp.name).tag() f.close() self['album'] = self.tagInfo.album.strip() self['artist'] = self.tagInfo.artist.strip() self['title'] = self.tagInfo.title.strip() self['comment'] = self.tagInfo.comment.strip() self['year'] = self.tagInfo.year self['genre'] = self.tagInfo.genre.strip() self['track'] = self.tagInfo.track for i in self.textfields: if (self[i] == ""): del self[i] for i in self.numfields: if (self[i] == 0): del self[i] class QueueMgr(object): queuedjobs = [] activejobs = [] def __init__(self, callback = None, maxjobs = 2): self.callback = callback self.maxjobs = maxjobs pass def add(self, job): log.debug("Job added") self.queuedjobs.append(job) def poll(self): """ Poll active jobs and check if we should make a new job active """ if (len(self.activejobs) == 0): needajob = True else: needajob = False for j in self.activejobs: if j.isfinished(): log.debug("job is done") needajob = True self.activejobs.remove(j) if (self.callback != None): self.callback(j) if needajob: #log.debug("Number of queued jobs = " + str(len(self.queuedjobs)) + ", number of active jobs = " + str(len(self.activejobs))) while len(self.queuedjobs) > 0 and len(self.activejobs) < self.maxjobs: newjob = self.queuedjobs.pop(0) newjob.start() self.activejobs.append(newjob) def isidle(self): """ Returns true if both queues are empty """ return(len(self.queuedjobs) == 0 and len(self.activejobs) == 0) class TranscodeJob(object): # Programs used to decode (to a wav stream) decode = {} decode["mp3"] = ["mpg123", "-w", "-", "-"] decode["ogg"] = ["ogg123", "-d", "wav", "-f", "-", "-"] # XXX: this is really fugly but faad refuses to read from a pipe decode["mp4"] = ["env", "MPLAYER_VERBOSE=-100", "mplayer", "-ao", "pcm:file=/dev/stdout", "-"] decode["m4a"] = decode["mp4"] decode["flac"] = ["flac", "-d", "-c", "-"] # Programs used to encode (from a wav stream) encode = {} encode["mp3"] = ["lame", "--abr", "128", "-", "-"] encode["ogg"] = ["oggenc", "-q", "2", "-"] encode["mp4"] = ["faac", "-wo", "/dev/stdout", "-"] encode["m4a"] = encode["mp4"] # XXX: can't encode flac - it's wav parser chokes on mpg123's output, it does work # OK if passed through sox but we can't do that. If you really want flac modify # the code & send me a diff or write a wrapper shell script :) #encode["flac"] = ["flac", "-c", "-"] # Options for output programs to store ID3 tag information tagopt = {} tagopt["mp3"] = { "album" : "--tl", "artist" : "--ta", "title" : "--tt", "track" : "--tn" } tagopt["ogg"] = { "album" : "-l", "artist" : "-a", "title" : "-a", "track" : "-N" } tagopt["mp4"] = { "album" : "--album", "artist" : "--artist", "title" : "--title", "track" : "--track" } #tagopt["flac"] = { "album" : "-Talbum=%s", "artist" : "-Tartist=%s", "title" : "-Ttitle=%s", "track" : "-Ttracknumber=%s" } def __init__(self, _inurl, _tofmt): self.errormsg = None log.debug("Creating job") self.inurl = _inurl self.tofmt = string.lower(_tofmt) self.inext = string.lower(string.rsplit(self.inurl, ".", 1)[1]) if (self.inext in self.decode): log.debug("can decode with " + str(self.decode[self.inext])) else: log.debug("unable to decode " + self.inext) raise KeyError("no available decoder") if (self.tofmt in self.encode): log.debug("can encode with " + str(self.encode[self.tofmt])) else: log.debug("unable to encode " + self.tofmt) raise KeyError("no available encoder") def start(self): log.debug("Starting job") try: self.inputfile = urllib.urlopen(self.inurl) self.outfd, self.outfname = tempfile.mkstemp(prefix="transcode-", suffix="." + self.tofmt) #self.outfname = string.join(string.rsplit(self.inurl, ".")[:-1] + [self.tofmt], ".") self.errfh, self.errfname = tempfile.mkstemp(prefix="transcode-", suffix=".log") self.outurl = urlparse.urlunsplit(["file", None, self.outfname, None, None]) log.debug("Outputting to " + self.outfname + " " + self.outurl + ")") log.debug("Errors to " + self.errfname) # assemble command line for encoder encoder = [] encoder += self.encode[self.tofmt] try: if (have_tagpy and self.tofmt in self.tagopt): taginfo = tagpywrap(self.inurl) for f in taginfo.allfields: if (f in taginfo and f in self.tagopt[self.tofmt]): inf = taginfo[f] opt = self.tagopt[self.tofmt][f] log.debug(" %s = %s %s" % (f, opt, inf)) # If we have a substitution, make it. If # not append the info as a separate # arg. Note that the tag options are # passed in as the second option because a # lot of programs don't parse options # after their file list. if ('%s' in opt): opt = opt.replace('%s', inf) encoder.insert(1, opt) else: encoder.insert(1, opt) encoder.insert(2, inf) finally: pass log.debug("decoder -> " + str(self.decode[self.inext])) log.debug("encoder -> " + str(encoder)) self.decoder = subprocess.Popen(self.decode[self.inext], stdin=self.inputfile, stdout=subprocess.PIPE, stderr=self.errfh) self.encoder = subprocess.Popen(encoder, stdin=self.decoder.stdout, stdout=self.outfd, stderr=self.errfh) log.debug("Processes connected") except Exception, e: log.debug("Failed to start - " + str(e)) self.errormsg = str(e) try: os.unlink(self.outfname) except: pass def isfinished(self): if (self.errormsg != None): return(True) rtn = self.encoder.poll() if (rtn == None): return(False) if (rtn == 0): os.unlink(self.errfname) self.errormsg = None else: log.debug("error in transcode, please review " + self.errfname) self.errormsg = "Unable to transcode, please review " + self.errfname try: os.unlink(self.outfname) except: pass return(True) ############################################################################ # amaKode ############################################################################ class amaKode(object): """ The main application""" def __init__(self, args): """ Main loop waits for something to do then does it """ log.debug("Started.") if (have_tagpy): log.debug("Using tagpy") else: log.debug("Warning: tagpy is unavailable") self.readSettings() self.queue = QueueMgr(callback = self.notify, maxjobs = 1) while True: # Check for finished jobs, etc self.queue.poll() # Check if there's anything waiting on stdin res = select.select([sys.stdin.fileno()], [], [], 0.1) if (sys.stdin.fileno() in res[0]): # Let's hope we got a whole line or we stall here line = sys.stdin.readline() if line: self.customEvent(line) else: break def readSettings(self): """ Reads settings from configuration file """ try: foovar = config.get("General", "foo") except: log.debug("No config file found, using defaults.") def customEvent(self, string): """ Handles notifications """ #log.debug("Received notification: " + str(string)) if string.find("transcode") != -1: self.transcode(str(string)) if string.find("quit") != -1: self.quit() def transcode(self, line): """ Called when requested to transcode a track """ args = string.split(line) if (len(args) != 3): log.debug("Invalid transcode command") return log.debug("transcoding " + args[1] + " to " + args[2]) try: newjob = TranscodeJob(args[1], args[2]) except: log.debug("Can't create transcoding job") os.system("dcop amarok mediabrowser transcodingFinished " + re.escape(args[1]) + "\"\"") self.queue.add(newjob) def notify(self, job): """ Report to amarok that the job is done """ if (job.errormsg == None): log.debug("Job " + job.inurl + " completed successfully") os.system("dcop amarok mediabrowser transcodingFinished " + re.escape(job.inurl) + " " + re.escape(job.outurl)) else: log.debug("Job " + job.inurl + " failed - " + job.errormsg) os.system("dcop amarok mediabrowser transcodingFinished " + re.escape(job.inurl) + "\"\"") def quit(self): log.debug("quitting") sys.exit() ############################################################################ def debug(message): """ Prints debug message to stdout """ log.debug(message) def onStop(signum, stackframe): """ Called when script is stopped by user """ log.debug("signalled exit") sys.exit() def initLog(): # Init our logging global log log = logging.getLogger("amaKode") # Default to warts and all logging log.setLevel(logging.DEBUG) # Log to this file logfile = logging.handlers.RotatingFileHandler(filename = "/tmp/amakode.log", maxBytes = 10000, backupCount = 3) # And stderr logstderr = logging.StreamHandler() # Format it nicely formatter = logging.Formatter("[%(name)s] %(message)s") # Glue it all together logfile.setFormatter(formatter) logstderr.setFormatter(formatter) log.addHandler(logfile) log.addHandler(logstderr) return(log) def reportJob(job): """ Report to amarok that the job is done """ if (job.errormsg == None): log.debug("Job " + job.inurl + " completed successfully") log.debug("dcop amarok mediabrowser transcodingFinished " + job.inurl + " " + job.outurl) else: log.debug("Job " + job.inurl + " failed - " + job.errormsg) log.debug("dcop amarok mediabrowser transcodingFinished " + job.inurl + "\"\"") if __name__ == "__main__": initLog() signal.signal(signal.SIGINT, onStop) signal.signal(signal.SIGHUP, onStop) signal.signal(signal.SIGTERM, onStop) if 1: # Run normal application app = amaKode(sys.argv) else: # Quick test case q = QueueMgr(reportJob) j = TranscodeJob("file:///tmp/test.mp3", "ogg") q.add(j) j2 = TranscodeJob("file:///tmp/test2.mp3", "m4a") q.add(j2) while not q.isidle(): q.poll() res = select.select([], [], [], 1) log.debug("jobs all done")