summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--module/HookManager.py61
-rw-r--r--module/PluginThread.py10
-rw-r--r--module/PyFile.py3
-rw-r--r--module/Scheduler.py40
-rw-r--r--module/database/DatabaseBackend.py6
-rw-r--r--module/database/FileDatabase.py15
-rw-r--r--module/plugins/Hook.py35
-rwxr-xr-xpyLoadCore.py4
8 files changed, 126 insertions, 48 deletions
diff --git a/module/HookManager.py b/module/HookManager.py
index 1adc07117..be5f548d7 100644
--- a/module/HookManager.py
+++ b/module/HookManager.py
@@ -78,7 +78,7 @@ class HookManager:
#registering callback for config event
self.config.pluginCB = MethodType(self.dispatchEvent, "pluginConfigChanged", basestring)
- self.addEvent("pluginConfigChanged", self.activateHooks)
+ self.addEvent("pluginConfigChanged", self.manageHooks)
self.lock = RLock()
self.createIndex()
@@ -147,17 +147,19 @@ class HookManager:
self.plugins = plugins
- def activateHooks(self, plugin, name, value):
+ def manageHooks(self, plugin, name, value):
+ if name == "activated" and value:
+ self.activateHook(plugin)
+ elif name == "activated" and not value:
+ self.deactivateHook(plugin)
- if name != "activated" or not value:
- return
+ def activateHook(self, plugin):
#check if already loaded
for inst in self.plugins:
if inst.__name__ == plugin:
return
-
pluginClass = self.core.pluginManager.getHookPlugin(plugin)
if not pluginClass: return
@@ -171,25 +173,23 @@ class HookManager:
# call core Ready
start_new_thread(plugin.coreReady, tuple())
- # init periodical call
- self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False)
+ def deactivateHook(self, plugin):
+ hook = None
+ for inst in self.plugins:
+ if inst.__name__ == plugin:
+ hook = inst
- def wrapPeriodical(self, plugin):
- plugin.lastCall = time()
- try:
- if plugin.isActivated(): plugin.periodical()
- except Exception, e:
- self.core.log.error(_("Error executing hooks: %s") % str(e))
- if self.core.debug:
- traceback.print_exc()
+ if not hook: return
- self.core.scheduler.addJob(plugin.interval, self.wrapPeriodical, args=[plugin], threaded=False)
+ self.log.debug("Plugin unloaded: %s" % plugin)
- def initPeriodical(self):
- for plugin in self.plugins:
- if plugin.isActivated() and plugin.interval >= 1:
- self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False)
+ hook.unload()
+
+ #remove periodic call
+ self.log.debug("Removed callback %s" % self.core.scheduler.removeJob(hook.cb))
+ self.plugins.remove(hook)
+ del self.pluginMap[hook.__name__]
@try_catch
@@ -199,7 +199,14 @@ class HookManager:
plugin.coreReady()
self.dispatchEvent("coreReady")
- self.initPeriodical()
+
+ @try_catch
+ def coreExiting(self):
+ for plugin in self.plugins:
+ if plugin.isActivated():
+ plugin.coreExiting()
+
+ self.dispatchEvent("coreExiting")
@lock
def downloadStarts(self, pyfile):
@@ -222,6 +229,18 @@ class HookManager:
self.dispatchEvent("downloadFinished", pyfile)
@lock
+ @try_catch
+ def downloadFailed(self, pyfile):
+ for plugin in self.plugins:
+ if plugin.isActivated():
+ if "downloadFailed" in plugin.__threaded__:
+ self.startThread(plugin.downloadFinished, pyfile)
+ else:
+ plugin.downloadFailed(pyfile)
+
+ self.dispatchEvent("downloadFailed", pyfile)
+
+ @lock
def packageFinished(self, package):
for plugin in self.plugins:
if plugin.isActivated():
diff --git a/module/PluginThread.py b/module/PluginThread.py
index be29a680e..c5b85a043 100644
--- a/module/PluginThread.py
+++ b/module/PluginThread.py
@@ -198,6 +198,7 @@ class DownloadThread(PluginThread):
self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg})
pyfile.error = msg
+ self.m.core.hookManager.downloadFailed(pyfile)
self.clean(pyfile)
continue
@@ -238,6 +239,8 @@ class DownloadThread(PluginThread):
print_exc()
self.writeDebugReport(pyfile)
+ self.m.core.hookManager.downloadFailed(pyfile)
+
self.clean(pyfile)
continue
@@ -266,12 +269,13 @@ class DownloadThread(PluginThread):
print_exc()
self.writeDebugReport(pyfile)
+ self.m.core.hookManager.downloadFailed(pyfile)
self.clean(pyfile)
continue
finally:
self.m.core.files.save()
- self.m.core.files.checkAllLinksProcessed()
+ pyfile.checkIfProcessed()
exc_clear()
self.m.log.info(_("Download finished: %s") % pyfile.name)
@@ -285,12 +289,12 @@ class DownloadThread(PluginThread):
pyfile.finishIfDone()
self.m.core.files.save()
- #----------------------------------------------------------------------
+
def put(self, job):
"""assing job to thread"""
self.queue.put(job)
- #----------------------------------------------------------------------
+
def stop(self):
"""stops the thread"""
self.put("quit")
diff --git a/module/PyFile.py b/module/PyFile.py
index 07347fb93..de8ed1145 100644
--- a/module/PyFile.py
+++ b/module/PyFile.py
@@ -194,6 +194,9 @@ class PyFile(object):
self.release()
self.m.checkAllLinksFinished()
return True
+
+ def checkIfProcessed(self):
+ self.m.checkAllLinksProcessed(self.id)
def formatWait(self):
""" formats and return wait time in humanreadable format """
diff --git a/module/Scheduler.py b/module/Scheduler.py
index 5837dec9e..0bc396b69 100644
--- a/module/Scheduler.py
+++ b/module/Scheduler.py
@@ -39,23 +39,42 @@ class Deferred():
raise AlreadyCalled
self.result = (args, kwargs)
for f, cargs, ckwargs in self.call:
- args+=tuple(cargs)
+ args += tuple(cargs)
kwargs.update(ckwargs)
- f(*args **kwargs)
+ f(*args ** kwargs)
+
class Scheduler():
def __init__(self, core):
self.core = core
-
+
self.queue = PriorityQueue()
-
+
def addJob(self, t, call, args=[], kwargs={}, threaded=True):
d = Deferred()
t += time()
j = Job(t, call, args, kwargs, d, threaded)
self.queue.put((t, j))
return d
-
+
+
+ def removeJob(self, d):
+ """
+ :param d: defered object
+ :return: if job was deleted
+ """
+ index = -1
+
+ for i, j in enumerate(self.queue):
+ if j[1].deferred == d:
+ index = i
+
+ if index >= 0:
+ del self.queue[index]
+ return True
+
+ return False
+
def work(self):
while True:
t, j = self.queue.get()
@@ -68,6 +87,7 @@ class Scheduler():
self.queue.put((t, j))
break
+
class Job():
def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True):
self.time = float(time)
@@ -90,12 +110,20 @@ class Job():
else:
self.run()
+
class PriorityQueue():
""" a non blocking priority queue """
+
def __init__(self):
self.queue = []
self.lock = Lock()
+ def __iter__(self):
+ return iter(self.queue)
+
+ def __delitem__(self, key):
+ del self.queue[key]
+
def put(self, element):
self.lock.acquire()
heappush(self.queue, element)
@@ -108,6 +136,6 @@ class PriorityQueue():
el = heappop(self.queue)
return el
except IndexError:
- return None,None
+ return None, None
finally:
self.lock.release() \ No newline at end of file
diff --git a/module/database/DatabaseBackend.py b/module/database/DatabaseBackend.py
index d330262f9..9530390c3 100644
--- a/module/database/DatabaseBackend.py
+++ b/module/database/DatabaseBackend.py
@@ -76,8 +76,8 @@ class DatabaseJob():
self.result = None
self.exception = False
- #import inspect
- #self.frame = inspect.currentframe()
+# import inspect
+# self.frame = inspect.currentframe()
def __repr__(self):
from os.path import basename
@@ -89,7 +89,7 @@ class DatabaseJob():
del frame
del self.frame
- return "DataBase Job %s:%s\n%s" % (self.f.__name__, self.args[1:], output)
+ return "DataBase Job %s:%s\n%sResult: %s" % (self.f.__name__, self.args[1:], output, self.result)
def processJob(self):
try:
diff --git a/module/database/FileDatabase.py b/module/database/FileDatabase.py
index 19205dac6..2ca7fd07d 100644
--- a/module/database/FileDatabase.py
+++ b/module/database/FileDatabase.py
@@ -356,20 +356,19 @@ class FileHandler:
"""checks if all files are finished and dispatch event"""
if not self.getQueueCount(True):
- #hope its not called together with all DownloadsProcessed
- self.core.hookManager.dispatchEvent("allDownloadsProcessed")
self.core.hookManager.dispatchEvent("allDownloadsFinished")
self.core.log.debug("All downloads finished")
return True
return False
- def checkAllLinksProcessed(self):
- """checks if all files was processed and pyload would idle now"""
+ def checkAllLinksProcessed(self, fid):
+ """checks if all files was processed and pyload would idle now, needs fid which will be ignored when counting"""
+ # reset count so statistic will update (this is called when dl was processed)
self.resetCount()
-
- if not self.db.processcount(1):
+
+ if not self.db.processcount(1, fid):
self.core.hookManager.dispatchEvent("allDownloadsProcessed")
self.core.log.debug("All downloads processed")
return True
@@ -564,9 +563,9 @@ class FileMethods():
return self.c.fetchone()[0]
@style.queue
- def processcount(self, queue):
+ def processcount(self, queue, fid):
""" number of files which have to be proccessed """
- self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12)", (queue, ))
+ self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12) AND l.id != ?", (queue, str(fid)))
return self.c.fetchone()[0]
@style.inner
diff --git a/module/plugins/Hook.py b/module/plugins/Hook.py
index 3db3e47e9..7e4f58c66 100644
--- a/module/plugins/Hook.py
+++ b/module/plugins/Hook.py
@@ -19,7 +19,7 @@
"""
from thread import start_new_thread
-
+from traceback import print_exc
class Expose(object):
""" used for decoration to declare rpc services """
@@ -71,6 +71,9 @@ class Hook():
#: Provide information in dict here, usable by API `getInfo`
self.info = None
+ #: Callback of periodical job task, used by hookmanager
+ self.cb = None
+
#: `HookManager`
self.manager = manager
@@ -92,13 +95,33 @@ class Hook():
self.event_list = None
+ self.initPeriodical()
self.setup()
+ def initPeriodical(self):
+ if self.interval >=1:
+ self.cb = self.core.scheduler.addJob(0, self._periodical, threaded=False)
+
+ def _periodical(self):
+ try:
+ if self.isActivated(): self.periodical()
+ except Exception, e:
+ self.core.log.error(_("Error executing hooks: %s") % str(e))
+ if self.core.debug:
+ print_exc()
+
+ self.cb = self.core.scheduler.addJob(self.interval, self._periodical, threaded=False)
+
+
def __repr__(self):
return "<Hook %s>" % self.__name__
def setup(self):
- """ more init stuff if needed"""
+ """ more init stuff if needed """
+ pass
+
+ def unload(self):
+ """ called when hook was deactivated """
pass
def isActivated(self):
@@ -126,6 +149,9 @@ class Hook():
#event methods - overwrite these if needed
def coreReady(self):
pass
+
+ def coreExiting(self):
+ pass
def downloadStarts(self, pyfile):
pass
@@ -138,10 +164,7 @@ class Hook():
def packageFinished(self, pypack):
pass
-
- def packageFailed(self, pypack):
- pass
-
+
def beforeReconnecting(self, ip):
pass
diff --git a/pyLoadCore.py b/pyLoadCore.py
index be916f712..d4475e247 100755
--- a/pyLoadCore.py
+++ b/pyLoadCore.py
@@ -542,7 +542,7 @@ class Core(object):
try:
if self.config['webinterface']['activated'] and hasattr(self, "webserver"):
self.webserver.quit()
- #self.webserver.join()
+
for thread in self.threadManager.threads:
thread.put("quit")
pyfiles = self.files.cache.values()
@@ -550,6 +550,8 @@ class Core(object):
for pyfile in pyfiles:
pyfile.abortDownload()
+ self.hookManager.coreExiting()
+
except:
if self.debug:
print_exc()