diff options
Diffstat (limited to 'module/PluginThread.py')
-rw-r--r-- | module/PluginThread.py | 170 |
1 files changed, 115 insertions, 55 deletions
diff --git a/module/PluginThread.py b/module/PluginThread.py index e0e3b17c9..a44981c52 100644 --- a/module/PluginThread.py +++ b/module/PluginThread.py @@ -30,8 +30,10 @@ from types import MethodType from pycurl import error -from module.PyFile import PyFile -from module.plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload +from PyFile import PyFile +from plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload +from common.packagetools import parseNames +from remote.thriftbackend.thriftgen.pyload.ttypes import OnlineStatus class PluginThread(Thread): @@ -46,7 +48,8 @@ class PluginThread(Thread): def writeDebugReport(self, pyfile): - dump = "pyLoad %s Debug Report of %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (self.m.core.api.getServerVersion(), pyfile.pluginname, format_exc()) + dump = "pyLoad %s Debug Report of %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( + self.m.core.api.getServerVersion(), pyfile.pluginname, format_exc()) tb = exc_info()[2] stack = [] @@ -55,18 +58,17 @@ class PluginThread(Thread): tb = tb.tb_next for frame in stack[1:]: - dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, - frame.f_code.co_filename, - frame.f_lineno) + frame.f_code.co_filename, + frame.f_lineno) for key, value in frame.f_locals.items(): dump += "\t%20s = " % key try: dump += pformat(value) + "\n" except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n" - + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + del frame del stack #delete it just to be sure... @@ -80,7 +82,7 @@ class PluginThread(Thread): try: dump += pformat(attr) + "\n" except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n" + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" dump += "\nPYFILE OBJECT DUMP: \n\n" @@ -91,14 +93,11 @@ class PluginThread(Thread): try: dump += pformat(attr) + "\n" except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n" - + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" if pyfile.pluginname in self.m.core.config.plugin: dump += "\n\nCONFIG: \n\n" - dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) +"\n" - - + dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) + "\n" dump_name = "debug_%s_%s.txt" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S")) self.m.core.log.info("Debug Report written to %s" % dump_name) @@ -142,7 +141,6 @@ class DownloadThread(PluginThread): return True try: - if not pyfile.hasPlugin(): continue #this pyfile was deleted while queueing @@ -154,7 +152,6 @@ class DownloadThread(PluginThread): pyfile.plugin.preprocessing(self) except NotImplementedError: - self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) pyfile.setStatus("failed") pyfile.error = "Plugin does not work" @@ -164,9 +161,9 @@ class DownloadThread(PluginThread): except Abort: try: self.m.log.info(_("Download aborted: %s") % pyfile.name) - except : + except: pass - + pyfile.setStatus("aborted") self.clean(pyfile) @@ -182,14 +179,12 @@ class DownloadThread(PluginThread): continue except Retry, e: - reason = e.args[0] - self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name" : pyfile.name, "msg": reason}) + self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) self.queue.put(pyfile) continue except Fail, e: - msg = e.args[0] if msg == "offline": @@ -218,7 +213,7 @@ class DownloadThread(PluginThread): if code in (7, 18, 28, 52, 56): self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) wait = time() + 60 - + pyfile.waitUntil = wait pyfile.setStatus("waiting") while time() < wait: @@ -247,10 +242,10 @@ class DownloadThread(PluginThread): continue except SkipDownload, e: - pyfile.setStatus("skipped") - self.m.log.info(_("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message}) + self.m.log.info( + _("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message}) self.clean(pyfile) @@ -279,7 +274,6 @@ class DownloadThread(PluginThread): self.m.core.files.checkAllLinksProcessed() exc_clear() - self.m.log.info(_("Download finished: %s") % pyfile.name) #pyfile.plugin.req.clean() @@ -302,7 +296,6 @@ class DownloadThread(PluginThread): self.put("quit") - class DecrypterThread(PluginThread): """thread for decrypting""" @@ -330,12 +323,10 @@ class DecrypterThread(PluginThread): self.active.plugin.preprocessing(self) except NotImplementedError: - self.m.log.error(_("Plugin %s is missing a function.") % self.active.pluginname) return except Fail, e: - msg = e.args[0] if msg == "offline": @@ -343,28 +334,25 @@ class DecrypterThread(PluginThread): self.m.log.warning(_("Download is offline: %s") % self.active.name) else: self.active.setStatus("failed") - self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % { "name" : self.active.name, "msg":msg }) + self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": msg}) self.active.error = msg return - + except Abort: - self.m.log.info(_("Download aborted: %s") % pyfile.name) pyfile.setStatus("aborted") - + return except Retry: - self.m.log.info(_("Retrying %s") % self.active.name) retry = True return self.run() except Exception, e: - self.active.setStatus("failed") - self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % { "name" : self.active.name, "msg" :str(e) }) + self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": str(e)}) self.active.error = str(e) if self.m.core.debug: @@ -391,6 +379,7 @@ class DecrypterThread(PluginThread): if not retry: pyfile.delete() + class HookThread(PluginThread): """thread for hooks""" @@ -403,7 +392,7 @@ class HookThread(PluginThread): self.active = pyfile m.localThreads.append(self) - + if isinstance(pyfile, PyFile): pyfile.setStatus("processing") @@ -411,22 +400,26 @@ class HookThread(PluginThread): def run(self): self.f(self.active) - + self.m.localThreads.remove(self) if isinstance(self.active, PyFile): self.active.finishIfDone() class InfoThread(PluginThread): - #---------------------------------------------------------------------- - def __init__(self, manager, data, pid): + def __init__(self, manager, data, pid=-1, rid=-1): """Constructor""" PluginThread.__init__(self, manager) self.data = data self.pid = pid # package id # [ .. (name, plugin) .. ] + + self.rid = rid #result id + + self.cache = [] #accumulated data + self.start() #---------------------------------------------------------------------- @@ -441,21 +434,88 @@ class InfoThread(PluginThread): else: plugins[plugin] = [url] - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - if hasattr(plugin, "getInfo"): - try: - self.m.core.log.debug("Run Info Fetching for %s" % pluginname) - for result in plugin.getInfo(urls): - #result = [ .. (name, size, status, url) .. ] - if not type(result) == list: result = [result] - self.m.core.files.updateFileInfo(result, self.pid) + #directly write to database + if self.pid > -1: + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + if hasattr(plugin, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) + self.m.core.files.save() - self.m.core.log.debug("Finished Info Fetching for %s" % pluginname) + else: #post the results - self.m.core.files.save() - except Exception, e: - self.m.core.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % {"name": pluginname, "err": str(e)} ) - if self.m.core.debug: - print_exc() - + self.m.infoResults[self.rid] = {} + + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + if hasattr(plugin, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + + #force to process cache + if self.cache: + self.updateResult(pluginname, [], True) + + else: + #generate default result + pass + + self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = [] + + + def updateDB(self, plugin, result): + self.m.core.files.updateFileInfo(result, self.pid) + + def updateResult(self, plugin, result, force=False): + #parse package name and generate result + #accumulate results + + self.cache.extend(result) + + if len(self.cache) > 20 or force: + #used for package generating + tmp = [(name, (url, OnlineStatus(name, plugin, status, int(size)))) + for name, size, status, url in self.cache] + + result = parseNames(tmp) + for k in result.iterkeys(): + result[k] = dict(result[k]) + + print result + + self.cache = [] + + def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): + try: + result = [] #result loaded from cache + process = [] #urls to process + for url in urls: + if url in self.m.infoCache: + result.append(self.m.infoCache[url]) + else: + process.append(url) + + if result: + self.m.core.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) + cb(pluginname, result) + + if process: + self.m.core.log.debug("Run Info Fetching for %s" % pluginname) + for result in plugin.getInfo(process): + #result = [ .. (name, size, status, url) .. ] + if not type(result) == list: result = [result] + + for res in result: + self.m.infoCache[res[3]] = res + + cb(pluginname, result) + + self.m.core.log.debug("Finished Info Fetching for %s" % pluginname) + except Exception, e: + self.m.core.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % + {"name": pluginname, "err": str(e)}) + if self.m.core.debug: + print_exc() + + #TODO: generate default results + if err: + pass |