diff options
author | RaNaN <Mast3rRaNaN@hotmail.de> | 2011-08-16 20:11:57 +0200 |
---|---|---|
committer | RaNaN <Mast3rRaNaN@hotmail.de> | 2011-08-16 20:11:57 +0200 |
commit | 218ef6ec665fac20928d293e950458960f36c633 (patch) | |
tree | b2d279d6e45ac1b556db5df215ebb464333a0891 | |
parent | updated thrift, --clean method, youtube fix closed #379 (diff) | |
download | pyload-218ef6ec665fac20928d293e950458960f36c633.tar.xz |
plugin unloading, closed #377, some fixes
-rw-r--r-- | module/HookManager.py | 61 | ||||
-rw-r--r-- | module/PluginThread.py | 10 | ||||
-rw-r--r-- | module/PyFile.py | 3 | ||||
-rw-r--r-- | module/Scheduler.py | 40 | ||||
-rw-r--r-- | module/database/DatabaseBackend.py | 6 | ||||
-rw-r--r-- | module/database/FileDatabase.py | 15 | ||||
-rw-r--r-- | module/plugins/Hook.py | 35 | ||||
-rwxr-xr-x | pyLoadCore.py | 4 |
8 files changed, 126 insertions, 48 deletions
diff --git a/module/HookManager.py b/module/HookManager.py index 1adc07117..be5f548d7 100644 --- a/module/HookManager.py +++ b/module/HookManager.py @@ -78,7 +78,7 @@ class HookManager: #registering callback for config event self.config.pluginCB = MethodType(self.dispatchEvent, "pluginConfigChanged", basestring) - self.addEvent("pluginConfigChanged", self.activateHooks) + self.addEvent("pluginConfigChanged", self.manageHooks) self.lock = RLock() self.createIndex() @@ -147,17 +147,19 @@ class HookManager: self.plugins = plugins - def activateHooks(self, plugin, name, value): + def manageHooks(self, plugin, name, value): + if name == "activated" and value: + self.activateHook(plugin) + elif name == "activated" and not value: + self.deactivateHook(plugin) - if name != "activated" or not value: - return + def activateHook(self, plugin): #check if already loaded for inst in self.plugins: if inst.__name__ == plugin: return - pluginClass = self.core.pluginManager.getHookPlugin(plugin) if not pluginClass: return @@ -171,25 +173,23 @@ class HookManager: # call core Ready start_new_thread(plugin.coreReady, tuple()) - # init periodical call - self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False) + def deactivateHook(self, plugin): + hook = None + for inst in self.plugins: + if inst.__name__ == plugin: + hook = inst - def wrapPeriodical(self, plugin): - plugin.lastCall = time() - try: - if plugin.isActivated(): plugin.periodical() - except Exception, e: - self.core.log.error(_("Error executing hooks: %s") % str(e)) - if self.core.debug: - traceback.print_exc() + if not hook: return - self.core.scheduler.addJob(plugin.interval, self.wrapPeriodical, args=[plugin], threaded=False) + self.log.debug("Plugin unloaded: %s" % plugin) - def initPeriodical(self): - for plugin in self.plugins: - if plugin.isActivated() and plugin.interval >= 1: - self.core.scheduler.addJob(0, self.wrapPeriodical, args=[plugin], threaded=False) + hook.unload() + + #remove periodic call + self.log.debug("Removed callback %s" % self.core.scheduler.removeJob(hook.cb)) + self.plugins.remove(hook) + del self.pluginMap[hook.__name__] @try_catch @@ -199,7 +199,14 @@ class HookManager: plugin.coreReady() self.dispatchEvent("coreReady") - self.initPeriodical() + + @try_catch + def coreExiting(self): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.coreExiting() + + self.dispatchEvent("coreExiting") @lock def downloadStarts(self, pyfile): @@ -222,6 +229,18 @@ class HookManager: self.dispatchEvent("downloadFinished", pyfile) @lock + @try_catch + def downloadFailed(self, pyfile): + for plugin in self.plugins: + if plugin.isActivated(): + if "downloadFailed" in plugin.__threaded__: + self.startThread(plugin.downloadFinished, pyfile) + else: + plugin.downloadFailed(pyfile) + + self.dispatchEvent("downloadFailed", pyfile) + + @lock def packageFinished(self, package): for plugin in self.plugins: if plugin.isActivated(): diff --git a/module/PluginThread.py b/module/PluginThread.py index be29a680e..c5b85a043 100644 --- a/module/PluginThread.py +++ b/module/PluginThread.py @@ -198,6 +198,7 @@ class DownloadThread(PluginThread): self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) pyfile.error = msg + self.m.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) continue @@ -238,6 +239,8 @@ class DownloadThread(PluginThread): print_exc() self.writeDebugReport(pyfile) + self.m.core.hookManager.downloadFailed(pyfile) + self.clean(pyfile) continue @@ -266,12 +269,13 @@ class DownloadThread(PluginThread): print_exc() self.writeDebugReport(pyfile) + self.m.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) continue finally: self.m.core.files.save() - self.m.core.files.checkAllLinksProcessed() + pyfile.checkIfProcessed() exc_clear() self.m.log.info(_("Download finished: %s") % pyfile.name) @@ -285,12 +289,12 @@ class DownloadThread(PluginThread): pyfile.finishIfDone() self.m.core.files.save() - #---------------------------------------------------------------------- + def put(self, job): """assing job to thread""" self.queue.put(job) - #---------------------------------------------------------------------- + def stop(self): """stops the thread""" self.put("quit") diff --git a/module/PyFile.py b/module/PyFile.py index 07347fb93..de8ed1145 100644 --- a/module/PyFile.py +++ b/module/PyFile.py @@ -194,6 +194,9 @@ class PyFile(object): self.release() self.m.checkAllLinksFinished() return True + + def checkIfProcessed(self): + self.m.checkAllLinksProcessed(self.id) def formatWait(self): """ formats and return wait time in humanreadable format """ diff --git a/module/Scheduler.py b/module/Scheduler.py index 5837dec9e..0bc396b69 100644 --- a/module/Scheduler.py +++ b/module/Scheduler.py @@ -39,23 +39,42 @@ class Deferred(): raise AlreadyCalled self.result = (args, kwargs) for f, cargs, ckwargs in self.call: - args+=tuple(cargs) + args += tuple(cargs) kwargs.update(ckwargs) - f(*args **kwargs) + f(*args ** kwargs) + class Scheduler(): def __init__(self, core): self.core = core - + self.queue = PriorityQueue() - + def addJob(self, t, call, args=[], kwargs={}, threaded=True): d = Deferred() t += time() j = Job(t, call, args, kwargs, d, threaded) self.queue.put((t, j)) return d - + + + def removeJob(self, d): + """ + :param d: defered object + :return: if job was deleted + """ + index = -1 + + for i, j in enumerate(self.queue): + if j[1].deferred == d: + index = i + + if index >= 0: + del self.queue[index] + return True + + return False + def work(self): while True: t, j = self.queue.get() @@ -68,6 +87,7 @@ class Scheduler(): self.queue.put((t, j)) break + class Job(): def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True): self.time = float(time) @@ -90,12 +110,20 @@ class Job(): else: self.run() + class PriorityQueue(): """ a non blocking priority queue """ + def __init__(self): self.queue = [] self.lock = Lock() + def __iter__(self): + return iter(self.queue) + + def __delitem__(self, key): + del self.queue[key] + def put(self, element): self.lock.acquire() heappush(self.queue, element) @@ -108,6 +136,6 @@ class PriorityQueue(): el = heappop(self.queue) return el except IndexError: - return None,None + return None, None finally: self.lock.release()
\ No newline at end of file diff --git a/module/database/DatabaseBackend.py b/module/database/DatabaseBackend.py index d330262f9..9530390c3 100644 --- a/module/database/DatabaseBackend.py +++ b/module/database/DatabaseBackend.py @@ -76,8 +76,8 @@ class DatabaseJob(): self.result = None self.exception = False - #import inspect - #self.frame = inspect.currentframe() +# import inspect +# self.frame = inspect.currentframe() def __repr__(self): from os.path import basename @@ -89,7 +89,7 @@ class DatabaseJob(): del frame del self.frame - return "DataBase Job %s:%s\n%s" % (self.f.__name__, self.args[1:], output) + return "DataBase Job %s:%s\n%sResult: %s" % (self.f.__name__, self.args[1:], output, self.result) def processJob(self): try: diff --git a/module/database/FileDatabase.py b/module/database/FileDatabase.py index 19205dac6..2ca7fd07d 100644 --- a/module/database/FileDatabase.py +++ b/module/database/FileDatabase.py @@ -356,20 +356,19 @@ class FileHandler: """checks if all files are finished and dispatch event""" if not self.getQueueCount(True): - #hope its not called together with all DownloadsProcessed - self.core.hookManager.dispatchEvent("allDownloadsProcessed") self.core.hookManager.dispatchEvent("allDownloadsFinished") self.core.log.debug("All downloads finished") return True return False - def checkAllLinksProcessed(self): - """checks if all files was processed and pyload would idle now""" + def checkAllLinksProcessed(self, fid): + """checks if all files was processed and pyload would idle now, needs fid which will be ignored when counting""" + # reset count so statistic will update (this is called when dl was processed) self.resetCount() - - if not self.db.processcount(1): + + if not self.db.processcount(1, fid): self.core.hookManager.dispatchEvent("allDownloadsProcessed") self.core.log.debug("All downloads processed") return True @@ -564,9 +563,9 @@ class FileMethods(): return self.c.fetchone()[0] @style.queue - def processcount(self, queue): + def processcount(self, queue, fid): """ number of files which have to be proccessed """ - self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12)", (queue, )) + self.c.execute("SELECT COUNT(*) FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? AND l.status IN (2,3,5,7,12) AND l.id != ?", (queue, str(fid))) return self.c.fetchone()[0] @style.inner diff --git a/module/plugins/Hook.py b/module/plugins/Hook.py index 3db3e47e9..7e4f58c66 100644 --- a/module/plugins/Hook.py +++ b/module/plugins/Hook.py @@ -19,7 +19,7 @@ """ from thread import start_new_thread - +from traceback import print_exc class Expose(object): """ used for decoration to declare rpc services """ @@ -71,6 +71,9 @@ class Hook(): #: Provide information in dict here, usable by API `getInfo` self.info = None + #: Callback of periodical job task, used by hookmanager + self.cb = None + #: `HookManager` self.manager = manager @@ -92,13 +95,33 @@ class Hook(): self.event_list = None + self.initPeriodical() self.setup() + def initPeriodical(self): + if self.interval >=1: + self.cb = self.core.scheduler.addJob(0, self._periodical, threaded=False) + + def _periodical(self): + try: + if self.isActivated(): self.periodical() + except Exception, e: + self.core.log.error(_("Error executing hooks: %s") % str(e)) + if self.core.debug: + print_exc() + + self.cb = self.core.scheduler.addJob(self.interval, self._periodical, threaded=False) + + def __repr__(self): return "<Hook %s>" % self.__name__ def setup(self): - """ more init stuff if needed""" + """ more init stuff if needed """ + pass + + def unload(self): + """ called when hook was deactivated """ pass def isActivated(self): @@ -126,6 +149,9 @@ class Hook(): #event methods - overwrite these if needed def coreReady(self): pass + + def coreExiting(self): + pass def downloadStarts(self, pyfile): pass @@ -138,10 +164,7 @@ class Hook(): def packageFinished(self, pypack): pass - - def packageFailed(self, pypack): - pass - + def beforeReconnecting(self, ip): pass diff --git a/pyLoadCore.py b/pyLoadCore.py index be916f712..d4475e247 100755 --- a/pyLoadCore.py +++ b/pyLoadCore.py @@ -542,7 +542,7 @@ class Core(object): try: if self.config['webinterface']['activated'] and hasattr(self, "webserver"): self.webserver.quit() - #self.webserver.join() + for thread in self.threadManager.threads: thread.put("quit") pyfiles = self.files.cache.values() @@ -550,6 +550,8 @@ class Core(object): for pyfile in pyfiles: pyfile.abortDownload() + self.hookManager.coreExiting() + except: if self.debug: print_exc() |