diff options
author | RaNaN <Mast3rRaNaN@hotmail.de> | 2013-09-26 16:40:38 +0200 |
---|---|---|
committer | RaNaN <Mast3rRaNaN@hotmail.de> | 2013-09-26 16:40:50 +0200 |
commit | 967d6dd16c25ceba22dcd105079f72534ddb87e9 (patch) | |
tree | 4c971ff446dc955f1884e5aa80ef4cb62bbf55fe /pyload/threads | |
parent | new DLC plugins (diff) | |
download | pyload-967d6dd16c25ceba22dcd105079f72534ddb87e9.tar.xz |
rewritten decrypter and info fetching thread
Diffstat (limited to 'pyload/threads')
-rw-r--r-- | pyload/threads/BaseThread.py | 41 | ||||
-rw-r--r-- | pyload/threads/DecrypterThread.py | 61 | ||||
-rw-r--r-- | pyload/threads/InfoThread.py | 199 | ||||
-rw-r--r-- | pyload/threads/ThreadManager.py | 46 |
4 files changed, 165 insertions, 182 deletions
diff --git a/pyload/threads/BaseThread.py b/pyload/threads/BaseThread.py index deaf03461..b7912e924 100644 --- a/pyload/threads/BaseThread.py +++ b/pyload/threads/BaseThread.py @@ -1,9 +1,9 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import sys from threading import Thread from time import strftime, gmtime -from sys import exc_info from types import MethodType from pprint import pformat from traceback import format_exc @@ -11,17 +11,18 @@ from traceback import format_exc from pyload.utils import primary_uid from pyload.utils.fs import listdir, join, save_join, stat, exists + class BaseThread(Thread): """abstract base class for thread types""" - def __init__(self, manager): + def __init__(self, manager, ower=None): Thread.__init__(self) self.setDaemon(True) self.m = manager #thread manager self.core = manager.core self.log = manager.core.log - #: Owner of the thread, every type should set it + #: Owner of the thread, every type should set it or overwrite user self.owner = None @property @@ -40,13 +41,13 @@ class BaseThread(Thread): dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S")) if pyfile: - dump = self.getFileDump(pyfile) + dump = self.getPluginDump(pyfile.plugin) + "\n" + dump += self.getFileDump(pyfile) else: dump = self.getPluginDump(plugin) try: import zipfile - zip = zipfile.ZipFile(dump_name, "w") if exists(join("tmp", name)): @@ -81,11 +82,11 @@ class BaseThread(Thread): self.log.info("Debug Report written to %s" % dump_name) return dump_name - def getFileDump(self, pyfile): + def getPluginDump(self, plugin): dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( - self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) + self.m.core.api.getServerVersion(), plugin.__name__, plugin.__version__, format_exc()) - tb = exc_info()[2] + tb = sys.exc_info()[2] stack = [] while tb: stack.append(tb.tb_frame) @@ -109,8 +110,8 @@ class BaseThread(Thread): dump += "\n\nPLUGIN OBJECT DUMP: \n\n" - for name in dir(pyfile.plugin): - attr = getattr(pyfile.plugin, name) + for name in dir(plugin): + attr = getattr(plugin, name) if not name.endswith("__") and type(attr) != MethodType: dump += "\t%20s = " % name try: @@ -118,7 +119,10 @@ class BaseThread(Thread): except Exception, e: dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - dump += "\nPYFILE OBJECT DUMP: \n\n" + return dump + + def getFileDump(self, pyfile): + dump = "PYFILE OBJECT DUMP: \n\n" for name in dir(pyfile): attr = getattr(pyfile, name) @@ -129,14 +133,13 @@ class BaseThread(Thread): except Exception, e: dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - dump += "\n\nCONFIG: \n\n" - dump += pformat(self.m.core.config.values) + "\n" - return dump - #TODO - def getPluginDump(self, plugin): - return "" - def getSystemDump(self): - return "" + dump = "SYSTEM:\n\n" + dump += """Platform: %s +Version: %s +Encoding: %s +FS-Encoding: %s + """ % (sys.platform, sys.version, sys.getdefaultencoding(), sys.getfilesystemencoding()) + return dump diff --git a/pyload/threads/DecrypterThread.py b/pyload/threads/DecrypterThread.py index e4df2ee75..e8b889ac8 100644 --- a/pyload/threads/DecrypterThread.py +++ b/pyload/threads/DecrypterThread.py @@ -2,10 +2,9 @@ # -*- coding: utf-8 -*- from time import sleep -from traceback import print_exc -from pyload.utils import uniqify -from pyload.plugins.Base import Retry +from pyload.utils import uniqify, accumulate +from pyload.plugins.Base import Abort, Retry from pyload.plugins.Crypter import Package from BaseThread import BaseThread @@ -14,30 +13,35 @@ class DecrypterThread(BaseThread): """thread for decrypting""" def __init__(self, manager, data, pid): - """constructor""" + # TODO: owner BaseThread.__init__(self, manager) + # [... (plugin, url) ...] self.data = data self.pid = pid self.start() def run(self): - plugin_map = {} - for url, plugin in self.data: - if plugin in plugin_map: - plugin_map[plugin].append(url) - else: - plugin_map[plugin] = [url] + pack = self.m.core.files.getPackage(self.pid) + links, packages = self.decrypt(accumulate(self.data), pack.password) - self.decrypt(plugin_map) + if links: + self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(links), "name": pack.name}) + self.m.core.api.addFiles(self.pid, [l.url for l in links]) - def decrypt(self, plugin_map): - pack = self.m.core.files.getPackage(self.pid) + # TODO: add single package into this one and rename it? + # TODO: nested packages + for p in packages: + self.m.core.api.addPackage(p.name, p.getURLs(), pack.password) + + def decrypt(self, plugin_map, password=None): result = [] + # TODO QUEUE_DECRYPT + for name, urls in plugin_map.iteritems(): klass = self.m.core.pluginManager.loadClass("crypter", name) - plugin = klass(self.m.core, pack, pack.password) + plugin = klass(self.m.core, password) plugin_result = [] try: @@ -46,36 +50,37 @@ class DecrypterThread(BaseThread): except Retry: sleep(1) plugin_result = plugin._decrypt(urls) + except Abort: + plugin.logInfo(_("Decrypting aborted")) except Exception, e: plugin.logError(_("Decrypting failed"), e) - if self.m.core.debug: - print_exc() + if self.core.debug: + self.core.print_exc() self.writeDebugReport(plugin.__name__, plugin=plugin) + finally: + plugin.clean() plugin.logDebug("Decrypted", plugin_result) result.extend(plugin_result) - #TODO package names are optional - result = uniqify(result) + # generated packages pack_names = {} + # urls without package urls = [] + # merge urls and packages for p in result: if isinstance(p, Package): if p.name in pack_names: pack_names[p.name].urls.extend(p.urls) else: - pack_names[p.name] = p + if not p.name: + urls.append(p) + else: + pack_names[p.name] = p else: urls.append(p) - if urls: - self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) - self.m.core.api.addFiles(self.pid, urls) - - for p in pack_names.itervalues(): - self.m.core.api.addPackage(p.name, p.urls, pack.password) - - if not result: - self.log.info(_("No links decrypted")) + urls = uniqify(urls) + return urls, pack_names.values()
\ No newline at end of file diff --git a/pyload/threads/InfoThread.py b/pyload/threads/InfoThread.py index fb4bdf11e..b62596ad3 100644 --- a/pyload/threads/InfoThread.py +++ b/pyload/threads/InfoThread.py @@ -9,25 +9,23 @@ from pyload.utils.packagetools import parseNames from pyload.utils import has_method, accumulate from BaseThread import BaseThread +from DecrypterThread import DecrypterThread -class InfoThread(BaseThread): - def __init__(self, manager, data, pid=-1, rid=-1): - """Constructor""" - BaseThread.__init__(self, manager) - self.data = data - self.pid = pid # package id - # [ .. (name, plugin) .. ] - - self.rid = rid #result id +class InfoThread(DecrypterThread): + def __init__(self, manager, owner, data, pid=-1, oc=None): + BaseThread.__init__(self, manager, owner) - self.cache = [] #accumulated data + # [... (plugin, url) ...] + self.data = data + self.pid = pid + self.oc = oc # online check + # urls that already have a package name + self.names = {} self.start() def run(self): - """run method""" - plugins = accumulate(self.data) crypter = {} @@ -37,93 +35,82 @@ class InfoThread(BaseThread): crypter[name] = plugins[name] del plugins[name] - #directly write to database - if self.pid > -1: - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPluginModule(pluginname) - klass = self.m.core.pluginManager.getPluginClass(pluginname) - if has_method(klass, "getInfo"): - self.fetchForPlugin(pluginname, klass, urls, self.updateDB) - self.m.core.files.save() - elif has_method(plugin, "getInfo"): - self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead") - self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) - self.m.core.files.save() - - else: #post the results - for name, urls in crypter.iteritems(): - #attach container content - try: - data = self.decrypt(name, urls) - except: - print_exc() - self.m.log.error("Could not decrypt content.") - data = [] - - accumulate(data, plugins) - - self.m.infoResults[self.rid] = {} - - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPluginModule(pluginname) - klass = self.m.core.pluginManager.getPluginClass(pluginname) - if has_method(klass, "getInfo"): - self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) - #force to process cache - if self.cache: - self.updateResult(pluginname, [], True) - elif has_method(plugin, "getInfo"): - self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") - self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) - #force to process cache - if self.cache: - self.updateResult(pluginname, [], True) - else: - #generate default result - result = [(url, 0, 3, url) for url in urls] - - self.updateResult(pluginname, result, True) - - self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} - + if crypter: + # decrypt them + links, packages = self.decrypt(crypter) + # push these as initial result and save package names + self.updateResult(links) + for pack in packages: + for url in pack.getURLs(): + self.names[url] = pack.name + + links.extend(pack.links) + self.updateResult(pack.links) + + # TODO: no plugin information pushed to GUI + # parse links and merge + hoster, crypter = self.m.core.pluginManager.parseUrls([l.url for l in links]) + accumulate(hoster + crypter, plugins) + + # db or info result + cb = self.updateDB if self.pid > 1 else self.updateResult + + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPluginModule(pluginname) + klass = self.m.core.pluginManager.getPluginClass(pluginname) + if has_method(klass, "getInfo"): + self.fetchForPlugin(klass, urls, cb) + # TODO: this branch can be removed in the future + elif has_method(plugin, "getInfo"): + self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") + self.fetchForPlugin(plugin, urls, cb) + + self.oc.done = True + self.names.clear() self.m.timestamp = time() + 5 * 60 - - def updateDB(self, plugin, result): - self.m.core.files.updateFileInfo(result, self.pid) - - def updateResult(self, plugin, result, force=False): - #parse package name and generate result - #accumulate results - - self.cache.extend(result) - - if len(self.cache) >= 20 or force: - #used for package generating - tmp = [(name, LinkStatus(url, name, plugin, int(size), status)) - for name, size, status, url in self.cache] - - data = parseNames(tmp) - self.m.setInfoResults(self.rid, data) - - self.cache = [] - - def updateCache(self, plugin, result): - self.cache.extend(result) - - def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): + def updateDB(self, result): + # writes results to db + # convert link info to tuples + info = [(l.name, l.size, l.status, l.url) for l in result if not l.hash] + info_hash = [(l.name, l.size, l.status, l.hash, l.url) for l in result if l.hash] + if info: + self.m.core.files.updateFileInfo(info, self.pid) + if info_hash: + self.m.core.files.updateFileInfo(info_hash, self.pid) + + def updateResult(self, result): + tmp = {} + parse = [] + # separate these with name and without + for link in result: + if link.url in self.names: + tmp[link] = self.names[link.url] + else: + parse.append(link) + + data = parseNames([(link.name, link) for link in parse]) + # merge in packages that already have a name + data = accumulate(tmp.iteritems(), data) + + self.m.setInfoResults(self.oc, data) + + def fetchForPlugin(self, plugin, urls, cb): + """executes info fetching for given plugin and urls""" + # also works on module names + pluginname = plugin.__name__.split(".")[-1] try: - result = [] #result loaded from cache + cached = [] #results loaded from cache process = [] #urls to process for url in urls: if url in self.m.infoCache: - result.append(self.m.infoCache[url]) + cached.append(self.m.infoCache[url]) else: process.append(url) - if result: - self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) - cb(pluginname, result) + if cached: + self.m.log.debug("Fetched %d links from cache for %s" % (len(cached), pluginname)) + cb(cached) if process: self.m.log.debug("Run Info Fetching for %s" % pluginname) @@ -131,26 +118,26 @@ class InfoThread(BaseThread): #result = [ .. (name, size, status, url) .. ] if not type(result) == list: result = [result] + links = [] + # Convert results to link statuses for res in result: - self.m.infoCache[res[3]] = res + if isinstance(res, LinkStatus): + links.append(res) + elif type(res) == tuple and len(res) == 4: + links.append(LinkStatus(res[3], res[0], int(res[1]), res[2], pluginname)) + elif type(res) == tuple and len(res) == 5: + links.append(LinkStatus(res[3], res[0], int(res[1]), res[2], pluginname, res[4])) + else: + self.m.log.debug("Invalid getInfo result: " + result) + + # put them on the cache + for link in links: + self.m.infoCache[link.url] = link - cb(pluginname, result) + cb(links) self.m.log.debug("Finished Info Fetching for %s" % pluginname) except Exception, e: self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % {"name": pluginname, "err": str(e)}) - if self.m.core.debug: - print_exc() - - # generate default results - if err: - result = [(url, 0, 3, url) for url in urls] - cb(pluginname, result) - - def decrypt(self, plugin, urls): - self.m.log.debug("Decrypting %s" % plugin) - klass = self.m.core.pluginManager.loadClass("crypter", plugin) - urls = klass.decrypt(self.core, urls) - data, crypter = self.m.core.pluginManager.parseUrls(urls) - return data + crypter + self.core.print_exc()
\ No newline at end of file diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py index 9ad23138a..ff8bfe8d7 100644 --- a/pyload/threads/ThreadManager.py +++ b/pyload/threads/ThreadManager.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- ############################################################################### -# Copyright(c) 2008-2012 pyLoad Team +# Copyright(c) 2008-2013 pyLoad Team # http://www.pyload.org # # This file is part of pyLoad. @@ -24,9 +24,8 @@ from time import sleep, time from traceback import print_exc from random import choice -import pycurl - from pyload.datatypes.PyFile import PyFile +from pyload.datatypes.OnlineCheck import OnlineCheck from pyload.network.RequestFactory import getURL from pyload.utils import lock, uniqify from pyload.utils.fs import free_space @@ -63,13 +62,12 @@ class ThreadManager: # pool of ids for online check self.resultIDs = 0 - # threads which are fetching hoster results + # saved online checks self.infoResults = {} + # timeout for cache purge self.timestamp = 0 - pycurl.global_init(pycurl.GLOBAL_DEFAULT) - for i in range(self.core.config.get("download", "max_downloads")): self.createThread() @@ -83,23 +81,26 @@ class ThreadManager: def createInfoThread(self, data, pid): """ start a thread which fetches online status and other info's """ self.timestamp = time() + 5 * 60 - if data: InfoThread(self, data, pid) + if data: InfoThread(self, None, data, pid) @lock - def createResultThread(self, data): + def createResultThread(self, user, data): """ creates a thread to fetch online status, returns result id """ self.timestamp = time() + 5 * 60 rid = self.resultIDs self.resultIDs += 1 + oc = OnlineCheck(rid, user) + self.infoResults[rid] = oc + # maps url to plugin urls = [] for links in data.itervalues(): for link in links: urls.append((link.url, link.plugin)) - InfoThread(self, urls, rid=rid) + InfoThread(self, user, urls, oc=oc) return rid @@ -108,23 +109,13 @@ class ThreadManager: """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" if data: DecrypterThread(self, data, pid) - @lock def getInfoResult(self, rid): - """returns result and clears it""" - self.timestamp = time() + 5 * 60 - # TODO: restrict user to his own results - if rid in self.infoResults: - data = self.infoResults[rid] - self.infoResults[rid] = {} - return data - else: - return {} + return self.infoResults.get(rid) - @lock - def setInfoResults(self, rid, result): - self.core.evm.dispatchEvent("linkcheck:updated", rid, result) - self.infoResults[rid].update(result) + def setInfoResults(self, oc, result): + self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner) + oc.update(result) def getActiveDownloads(self, user=None): # TODO: user context @@ -220,8 +211,7 @@ class ThreadManager: self.log.warning(_("Failed executing reconnect script!")) self.core.config["reconnect"]["activated"] = False self.reconnecting.clear() - if self.core.debug: - print_exc() + self.core.print_exc() return reconn.wait() @@ -268,6 +258,8 @@ class ThreadManager: """ make a global curl cleanup (currently unused) """ if self.processingIds(): return False + import pycurl + pycurl.global_cleanup() pycurl.global_init(pycurl.GLOBAL_DEFAULT) self.downloaded = 0 @@ -317,7 +309,3 @@ class ThreadManager: if occ not in self.core.files.jobCache: self.core.files.jobCache[occ] = [] self.core.files.jobCache[occ].append(job.id) - - def cleanup(self): - """do global cleanup, should be called when finished with pycurl""" - pycurl.global_cleanup() |