summaryrefslogtreecommitdiffstats
path: root/module/PluginThread.py
diff options
context:
space:
mode:
Diffstat (limited to 'module/PluginThread.py')
-rw-r--r--module/PluginThread.py170
1 files changed, 115 insertions, 55 deletions
diff --git a/module/PluginThread.py b/module/PluginThread.py
index e0e3b17c9..a44981c52 100644
--- a/module/PluginThread.py
+++ b/module/PluginThread.py
@@ -30,8 +30,10 @@ from types import MethodType
from pycurl import error
-from module.PyFile import PyFile
-from module.plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload
+from PyFile import PyFile
+from plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload
+from common.packagetools import parseNames
+from remote.thriftbackend.thriftgen.pyload.ttypes import OnlineStatus
class PluginThread(Thread):
@@ -46,7 +48,8 @@ class PluginThread(Thread):
def writeDebugReport(self, pyfile):
- dump = "pyLoad %s Debug Report of %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (self.m.core.api.getServerVersion(), pyfile.pluginname, format_exc())
+ dump = "pyLoad %s Debug Report of %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
+ self.m.core.api.getServerVersion(), pyfile.pluginname, format_exc())
tb = exc_info()[2]
stack = []
@@ -55,18 +58,17 @@ class PluginThread(Thread):
tb = tb.tb_next
for frame in stack[1:]:
-
dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name,
- frame.f_code.co_filename,
- frame.f_lineno)
+ frame.f_code.co_filename,
+ frame.f_lineno)
for key, value in frame.f_locals.items():
dump += "\t%20s = " % key
try:
dump += pformat(value) + "\n"
except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n"
-
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+
del frame
del stack #delete it just to be sure...
@@ -80,7 +82,7 @@ class PluginThread(Thread):
try:
dump += pformat(attr) + "\n"
except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n"
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
dump += "\nPYFILE OBJECT DUMP: \n\n"
@@ -91,14 +93,11 @@ class PluginThread(Thread):
try:
dump += pformat(attr) + "\n"
except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> "+ str(e) +"\n"
-
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
if pyfile.pluginname in self.m.core.config.plugin:
dump += "\n\nCONFIG: \n\n"
- dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) +"\n"
-
-
+ dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) + "\n"
dump_name = "debug_%s_%s.txt" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S"))
self.m.core.log.info("Debug Report written to %s" % dump_name)
@@ -142,7 +141,6 @@ class DownloadThread(PluginThread):
return True
try:
-
if not pyfile.hasPlugin(): continue
#this pyfile was deleted while queueing
@@ -154,7 +152,6 @@ class DownloadThread(PluginThread):
pyfile.plugin.preprocessing(self)
except NotImplementedError:
-
self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname)
pyfile.setStatus("failed")
pyfile.error = "Plugin does not work"
@@ -164,9 +161,9 @@ class DownloadThread(PluginThread):
except Abort:
try:
self.m.log.info(_("Download aborted: %s") % pyfile.name)
- except :
+ except:
pass
-
+
pyfile.setStatus("aborted")
self.clean(pyfile)
@@ -182,14 +179,12 @@ class DownloadThread(PluginThread):
continue
except Retry, e:
-
reason = e.args[0]
- self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name" : pyfile.name, "msg": reason})
+ self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason})
self.queue.put(pyfile)
continue
except Fail, e:
-
msg = e.args[0]
if msg == "offline":
@@ -218,7 +213,7 @@ class DownloadThread(PluginThread):
if code in (7, 18, 28, 52, 56):
self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry."))
wait = time() + 60
-
+
pyfile.waitUntil = wait
pyfile.setStatus("waiting")
while time() < wait:
@@ -247,10 +242,10 @@ class DownloadThread(PluginThread):
continue
except SkipDownload, e:
-
pyfile.setStatus("skipped")
- self.m.log.info(_("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message})
+ self.m.log.info(
+ _("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message})
self.clean(pyfile)
@@ -279,7 +274,6 @@ class DownloadThread(PluginThread):
self.m.core.files.checkAllLinksProcessed()
exc_clear()
-
self.m.log.info(_("Download finished: %s") % pyfile.name)
#pyfile.plugin.req.clean()
@@ -302,7 +296,6 @@ class DownloadThread(PluginThread):
self.put("quit")
-
class DecrypterThread(PluginThread):
"""thread for decrypting"""
@@ -330,12 +323,10 @@ class DecrypterThread(PluginThread):
self.active.plugin.preprocessing(self)
except NotImplementedError:
-
self.m.log.error(_("Plugin %s is missing a function.") % self.active.pluginname)
return
except Fail, e:
-
msg = e.args[0]
if msg == "offline":
@@ -343,28 +334,25 @@ class DecrypterThread(PluginThread):
self.m.log.warning(_("Download is offline: %s") % self.active.name)
else:
self.active.setStatus("failed")
- self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % { "name" : self.active.name, "msg":msg })
+ self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": msg})
self.active.error = msg
return
-
+
except Abort:
-
self.m.log.info(_("Download aborted: %s") % pyfile.name)
pyfile.setStatus("aborted")
-
+
return
except Retry:
-
self.m.log.info(_("Retrying %s") % self.active.name)
retry = True
return self.run()
except Exception, e:
-
self.active.setStatus("failed")
- self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % { "name" : self.active.name, "msg" :str(e) })
+ self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": str(e)})
self.active.error = str(e)
if self.m.core.debug:
@@ -391,6 +379,7 @@ class DecrypterThread(PluginThread):
if not retry:
pyfile.delete()
+
class HookThread(PluginThread):
"""thread for hooks"""
@@ -403,7 +392,7 @@ class HookThread(PluginThread):
self.active = pyfile
m.localThreads.append(self)
-
+
if isinstance(pyfile, PyFile):
pyfile.setStatus("processing")
@@ -411,22 +400,26 @@ class HookThread(PluginThread):
def run(self):
self.f(self.active)
-
+
self.m.localThreads.remove(self)
if isinstance(self.active, PyFile):
self.active.finishIfDone()
class InfoThread(PluginThread):
-
#----------------------------------------------------------------------
- def __init__(self, manager, data, pid):
+ def __init__(self, manager, data, pid=-1, rid=-1):
"""Constructor"""
PluginThread.__init__(self, manager)
self.data = data
self.pid = pid # package id
# [ .. (name, plugin) .. ]
+
+ self.rid = rid #result id
+
+ self.cache = [] #accumulated data
+
self.start()
#----------------------------------------------------------------------
@@ -441,21 +434,88 @@ class InfoThread(PluginThread):
else:
plugins[plugin] = [url]
- for pluginname, urls in plugins.iteritems():
- plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
- if hasattr(plugin, "getInfo"):
- try:
- self.m.core.log.debug("Run Info Fetching for %s" % pluginname)
- for result in plugin.getInfo(urls):
- #result = [ .. (name, size, status, url) .. ]
- if not type(result) == list: result = [result]
- self.m.core.files.updateFileInfo(result, self.pid)
+ #directly write to database
+ if self.pid > -1:
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
+ if hasattr(plugin, "getInfo"):
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
+ self.m.core.files.save()
- self.m.core.log.debug("Finished Info Fetching for %s" % pluginname)
+ else: #post the results
- self.m.core.files.save()
- except Exception, e:
- self.m.core.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % {"name": pluginname, "err": str(e)} )
- if self.m.core.debug:
- print_exc()
-
+ self.m.infoResults[self.rid] = {}
+
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
+ if hasattr(plugin, "getInfo"):
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
+
+ #force to process cache
+ if self.cache:
+ self.updateResult(pluginname, [], True)
+
+ else:
+ #generate default result
+ pass
+
+ self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = []
+
+
+ def updateDB(self, plugin, result):
+ self.m.core.files.updateFileInfo(result, self.pid)
+
+ def updateResult(self, plugin, result, force=False):
+ #parse package name and generate result
+ #accumulate results
+
+ self.cache.extend(result)
+
+ if len(self.cache) > 20 or force:
+ #used for package generating
+ tmp = [(name, (url, OnlineStatus(name, plugin, status, int(size))))
+ for name, size, status, url in self.cache]
+
+ result = parseNames(tmp)
+ for k in result.iterkeys():
+ result[k] = dict(result[k])
+
+ print result
+
+ self.cache = []
+
+ def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
+ try:
+ result = [] #result loaded from cache
+ process = [] #urls to process
+ for url in urls:
+ if url in self.m.infoCache:
+ result.append(self.m.infoCache[url])
+ else:
+ process.append(url)
+
+ if result:
+ self.m.core.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
+ cb(pluginname, result)
+
+ if process:
+ self.m.core.log.debug("Run Info Fetching for %s" % pluginname)
+ for result in plugin.getInfo(process):
+ #result = [ .. (name, size, status, url) .. ]
+ if not type(result) == list: result = [result]
+
+ for res in result:
+ self.m.infoCache[res[3]] = res
+
+ cb(pluginname, result)
+
+ self.m.core.log.debug("Finished Info Fetching for %s" % pluginname)
+ except Exception, e:
+ self.m.core.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
+ {"name": pluginname, "err": str(e)})
+ if self.m.core.debug:
+ print_exc()
+
+ #TODO: generate default results
+ if err:
+ pass