summaryrefslogtreecommitdiffstats
path: root/pyload/threads
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2013-09-26 16:40:38 +0200
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2013-09-26 16:40:50 +0200
commit967d6dd16c25ceba22dcd105079f72534ddb87e9 (patch)
tree4c971ff446dc955f1884e5aa80ef4cb62bbf55fe /pyload/threads
parentnew DLC plugins (diff)
downloadpyload-967d6dd16c25ceba22dcd105079f72534ddb87e9.tar.xz
rewritten decrypter and info fetching thread
Diffstat (limited to 'pyload/threads')
-rw-r--r--pyload/threads/BaseThread.py41
-rw-r--r--pyload/threads/DecrypterThread.py61
-rw-r--r--pyload/threads/InfoThread.py199
-rw-r--r--pyload/threads/ThreadManager.py46
4 files changed, 165 insertions, 182 deletions
diff --git a/pyload/threads/BaseThread.py b/pyload/threads/BaseThread.py
index deaf03461..b7912e924 100644
--- a/pyload/threads/BaseThread.py
+++ b/pyload/threads/BaseThread.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import sys
from threading import Thread
from time import strftime, gmtime
-from sys import exc_info
from types import MethodType
from pprint import pformat
from traceback import format_exc
@@ -11,17 +11,18 @@ from traceback import format_exc
from pyload.utils import primary_uid
from pyload.utils.fs import listdir, join, save_join, stat, exists
+
class BaseThread(Thread):
"""abstract base class for thread types"""
- def __init__(self, manager):
+ def __init__(self, manager, ower=None):
Thread.__init__(self)
self.setDaemon(True)
self.m = manager #thread manager
self.core = manager.core
self.log = manager.core.log
- #: Owner of the thread, every type should set it
+ #: Owner of the thread, every type should set it or overwrite user
self.owner = None
@property
@@ -40,13 +41,13 @@ class BaseThread(Thread):
dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S"))
if pyfile:
- dump = self.getFileDump(pyfile)
+ dump = self.getPluginDump(pyfile.plugin) + "\n"
+ dump += self.getFileDump(pyfile)
else:
dump = self.getPluginDump(plugin)
try:
import zipfile
-
zip = zipfile.ZipFile(dump_name, "w")
if exists(join("tmp", name)):
@@ -81,11 +82,11 @@ class BaseThread(Thread):
self.log.info("Debug Report written to %s" % dump_name)
return dump_name
- def getFileDump(self, pyfile):
+ def getPluginDump(self, plugin):
dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
- self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc())
+ self.m.core.api.getServerVersion(), plugin.__name__, plugin.__version__, format_exc())
- tb = exc_info()[2]
+ tb = sys.exc_info()[2]
stack = []
while tb:
stack.append(tb.tb_frame)
@@ -109,8 +110,8 @@ class BaseThread(Thread):
dump += "\n\nPLUGIN OBJECT DUMP: \n\n"
- for name in dir(pyfile.plugin):
- attr = getattr(pyfile.plugin, name)
+ for name in dir(plugin):
+ attr = getattr(plugin, name)
if not name.endswith("__") and type(attr) != MethodType:
dump += "\t%20s = " % name
try:
@@ -118,7 +119,10 @@ class BaseThread(Thread):
except Exception, e:
dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
- dump += "\nPYFILE OBJECT DUMP: \n\n"
+ return dump
+
+ def getFileDump(self, pyfile):
+ dump = "PYFILE OBJECT DUMP: \n\n"
for name in dir(pyfile):
attr = getattr(pyfile, name)
@@ -129,14 +133,13 @@ class BaseThread(Thread):
except Exception, e:
dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
- dump += "\n\nCONFIG: \n\n"
- dump += pformat(self.m.core.config.values) + "\n"
-
return dump
- #TODO
- def getPluginDump(self, plugin):
- return ""
-
def getSystemDump(self):
- return ""
+ dump = "SYSTEM:\n\n"
+ dump += """Platform: %s
+Version: %s
+Encoding: %s
+FS-Encoding: %s
+ """ % (sys.platform, sys.version, sys.getdefaultencoding(), sys.getfilesystemencoding())
+ return dump
diff --git a/pyload/threads/DecrypterThread.py b/pyload/threads/DecrypterThread.py
index e4df2ee75..e8b889ac8 100644
--- a/pyload/threads/DecrypterThread.py
+++ b/pyload/threads/DecrypterThread.py
@@ -2,10 +2,9 @@
# -*- coding: utf-8 -*-
from time import sleep
-from traceback import print_exc
-from pyload.utils import uniqify
-from pyload.plugins.Base import Retry
+from pyload.utils import uniqify, accumulate
+from pyload.plugins.Base import Abort, Retry
from pyload.plugins.Crypter import Package
from BaseThread import BaseThread
@@ -14,30 +13,35 @@ class DecrypterThread(BaseThread):
"""thread for decrypting"""
def __init__(self, manager, data, pid):
- """constructor"""
+ # TODO: owner
BaseThread.__init__(self, manager)
+ # [... (plugin, url) ...]
self.data = data
self.pid = pid
self.start()
def run(self):
- plugin_map = {}
- for url, plugin in self.data:
- if plugin in plugin_map:
- plugin_map[plugin].append(url)
- else:
- plugin_map[plugin] = [url]
+ pack = self.m.core.files.getPackage(self.pid)
+ links, packages = self.decrypt(accumulate(self.data), pack.password)
- self.decrypt(plugin_map)
+ if links:
+ self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(links), "name": pack.name})
+ self.m.core.api.addFiles(self.pid, [l.url for l in links])
- def decrypt(self, plugin_map):
- pack = self.m.core.files.getPackage(self.pid)
+ # TODO: add single package into this one and rename it?
+ # TODO: nested packages
+ for p in packages:
+ self.m.core.api.addPackage(p.name, p.getURLs(), pack.password)
+
+ def decrypt(self, plugin_map, password=None):
result = []
+ # TODO QUEUE_DECRYPT
+
for name, urls in plugin_map.iteritems():
klass = self.m.core.pluginManager.loadClass("crypter", name)
- plugin = klass(self.m.core, pack, pack.password)
+ plugin = klass(self.m.core, password)
plugin_result = []
try:
@@ -46,36 +50,37 @@ class DecrypterThread(BaseThread):
except Retry:
sleep(1)
plugin_result = plugin._decrypt(urls)
+ except Abort:
+ plugin.logInfo(_("Decrypting aborted"))
except Exception, e:
plugin.logError(_("Decrypting failed"), e)
- if self.m.core.debug:
- print_exc()
+ if self.core.debug:
+ self.core.print_exc()
self.writeDebugReport(plugin.__name__, plugin=plugin)
+ finally:
+ plugin.clean()
plugin.logDebug("Decrypted", plugin_result)
result.extend(plugin_result)
- #TODO package names are optional
- result = uniqify(result)
+ # generated packages
pack_names = {}
+ # urls without package
urls = []
+ # merge urls and packages
for p in result:
if isinstance(p, Package):
if p.name in pack_names:
pack_names[p.name].urls.extend(p.urls)
else:
- pack_names[p.name] = p
+ if not p.name:
+ urls.append(p)
+ else:
+ pack_names[p.name] = p
else:
urls.append(p)
- if urls:
- self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name})
- self.m.core.api.addFiles(self.pid, urls)
-
- for p in pack_names.itervalues():
- self.m.core.api.addPackage(p.name, p.urls, pack.password)
-
- if not result:
- self.log.info(_("No links decrypted"))
+ urls = uniqify(urls)
+ return urls, pack_names.values() \ No newline at end of file
diff --git a/pyload/threads/InfoThread.py b/pyload/threads/InfoThread.py
index fb4bdf11e..b62596ad3 100644
--- a/pyload/threads/InfoThread.py
+++ b/pyload/threads/InfoThread.py
@@ -9,25 +9,23 @@ from pyload.utils.packagetools import parseNames
from pyload.utils import has_method, accumulate
from BaseThread import BaseThread
+from DecrypterThread import DecrypterThread
-class InfoThread(BaseThread):
- def __init__(self, manager, data, pid=-1, rid=-1):
- """Constructor"""
- BaseThread.__init__(self, manager)
- self.data = data
- self.pid = pid # package id
- # [ .. (name, plugin) .. ]
-
- self.rid = rid #result id
+class InfoThread(DecrypterThread):
+ def __init__(self, manager, owner, data, pid=-1, oc=None):
+ BaseThread.__init__(self, manager, owner)
- self.cache = [] #accumulated data
+ # [... (plugin, url) ...]
+ self.data = data
+ self.pid = pid
+ self.oc = oc # online check
+ # urls that already have a package name
+ self.names = {}
self.start()
def run(self):
- """run method"""
-
plugins = accumulate(self.data)
crypter = {}
@@ -37,93 +35,82 @@ class InfoThread(BaseThread):
crypter[name] = plugins[name]
del plugins[name]
- #directly write to database
- if self.pid > -1:
- for pluginname, urls in plugins.iteritems():
- plugin = self.m.core.pluginManager.getPluginModule(pluginname)
- klass = self.m.core.pluginManager.getPluginClass(pluginname)
- if has_method(klass, "getInfo"):
- self.fetchForPlugin(pluginname, klass, urls, self.updateDB)
- self.m.core.files.save()
- elif has_method(plugin, "getInfo"):
- self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead")
- self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
- self.m.core.files.save()
-
- else: #post the results
- for name, urls in crypter.iteritems():
- #attach container content
- try:
- data = self.decrypt(name, urls)
- except:
- print_exc()
- self.m.log.error("Could not decrypt content.")
- data = []
-
- accumulate(data, plugins)
-
- self.m.infoResults[self.rid] = {}
-
- for pluginname, urls in plugins.iteritems():
- plugin = self.m.core.pluginManager.getPluginModule(pluginname)
- klass = self.m.core.pluginManager.getPluginClass(pluginname)
- if has_method(klass, "getInfo"):
- self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
- #force to process cache
- if self.cache:
- self.updateResult(pluginname, [], True)
- elif has_method(plugin, "getInfo"):
- self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead")
- self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
- #force to process cache
- if self.cache:
- self.updateResult(pluginname, [], True)
- else:
- #generate default result
- result = [(url, 0, 3, url) for url in urls]
-
- self.updateResult(pluginname, result, True)
-
- self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {}
-
+ if crypter:
+ # decrypt them
+ links, packages = self.decrypt(crypter)
+ # push these as initial result and save package names
+ self.updateResult(links)
+ for pack in packages:
+ for url in pack.getURLs():
+ self.names[url] = pack.name
+
+ links.extend(pack.links)
+ self.updateResult(pack.links)
+
+ # TODO: no plugin information pushed to GUI
+ # parse links and merge
+ hoster, crypter = self.m.core.pluginManager.parseUrls([l.url for l in links])
+ accumulate(hoster + crypter, plugins)
+
+ # db or info result
+ cb = self.updateDB if self.pid > 1 else self.updateResult
+
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPluginModule(pluginname)
+ klass = self.m.core.pluginManager.getPluginClass(pluginname)
+ if has_method(klass, "getInfo"):
+ self.fetchForPlugin(klass, urls, cb)
+ # TODO: this branch can be removed in the future
+ elif has_method(plugin, "getInfo"):
+ self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead")
+ self.fetchForPlugin(plugin, urls, cb)
+
+ self.oc.done = True
+ self.names.clear()
self.m.timestamp = time() + 5 * 60
-
- def updateDB(self, plugin, result):
- self.m.core.files.updateFileInfo(result, self.pid)
-
- def updateResult(self, plugin, result, force=False):
- #parse package name and generate result
- #accumulate results
-
- self.cache.extend(result)
-
- if len(self.cache) >= 20 or force:
- #used for package generating
- tmp = [(name, LinkStatus(url, name, plugin, int(size), status))
- for name, size, status, url in self.cache]
-
- data = parseNames(tmp)
- self.m.setInfoResults(self.rid, data)
-
- self.cache = []
-
- def updateCache(self, plugin, result):
- self.cache.extend(result)
-
- def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
+ def updateDB(self, result):
+ # writes results to db
+ # convert link info to tuples
+ info = [(l.name, l.size, l.status, l.url) for l in result if not l.hash]
+ info_hash = [(l.name, l.size, l.status, l.hash, l.url) for l in result if l.hash]
+ if info:
+ self.m.core.files.updateFileInfo(info, self.pid)
+ if info_hash:
+ self.m.core.files.updateFileInfo(info_hash, self.pid)
+
+ def updateResult(self, result):
+ tmp = {}
+ parse = []
+ # separate these with name and without
+ for link in result:
+ if link.url in self.names:
+ tmp[link] = self.names[link.url]
+ else:
+ parse.append(link)
+
+ data = parseNames([(link.name, link) for link in parse])
+ # merge in packages that already have a name
+ data = accumulate(tmp.iteritems(), data)
+
+ self.m.setInfoResults(self.oc, data)
+
+ def fetchForPlugin(self, plugin, urls, cb):
+ """executes info fetching for given plugin and urls"""
+ # also works on module names
+ pluginname = plugin.__name__.split(".")[-1]
try:
- result = [] #result loaded from cache
+ cached = [] #results loaded from cache
process = [] #urls to process
for url in urls:
if url in self.m.infoCache:
- result.append(self.m.infoCache[url])
+ cached.append(self.m.infoCache[url])
else:
process.append(url)
- if result:
- self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
- cb(pluginname, result)
+ if cached:
+ self.m.log.debug("Fetched %d links from cache for %s" % (len(cached), pluginname))
+ cb(cached)
if process:
self.m.log.debug("Run Info Fetching for %s" % pluginname)
@@ -131,26 +118,26 @@ class InfoThread(BaseThread):
#result = [ .. (name, size, status, url) .. ]
if not type(result) == list: result = [result]
+ links = []
+ # Convert results to link statuses
for res in result:
- self.m.infoCache[res[3]] = res
+ if isinstance(res, LinkStatus):
+ links.append(res)
+ elif type(res) == tuple and len(res) == 4:
+ links.append(LinkStatus(res[3], res[0], int(res[1]), res[2], pluginname))
+ elif type(res) == tuple and len(res) == 5:
+ links.append(LinkStatus(res[3], res[0], int(res[1]), res[2], pluginname, res[4]))
+ else:
+ self.m.log.debug("Invalid getInfo result: " + result)
+
+ # put them on the cache
+ for link in links:
+ self.m.infoCache[link.url] = link
- cb(pluginname, result)
+ cb(links)
self.m.log.debug("Finished Info Fetching for %s" % pluginname)
except Exception, e:
self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
{"name": pluginname, "err": str(e)})
- if self.m.core.debug:
- print_exc()
-
- # generate default results
- if err:
- result = [(url, 0, 3, url) for url in urls]
- cb(pluginname, result)
-
- def decrypt(self, plugin, urls):
- self.m.log.debug("Decrypting %s" % plugin)
- klass = self.m.core.pluginManager.loadClass("crypter", plugin)
- urls = klass.decrypt(self.core, urls)
- data, crypter = self.m.core.pluginManager.parseUrls(urls)
- return data + crypter
+ self.core.print_exc() \ No newline at end of file
diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py
index 9ad23138a..ff8bfe8d7 100644
--- a/pyload/threads/ThreadManager.py
+++ b/pyload/threads/ThreadManager.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
###############################################################################
-# Copyright(c) 2008-2012 pyLoad Team
+# Copyright(c) 2008-2013 pyLoad Team
# http://www.pyload.org
#
# This file is part of pyLoad.
@@ -24,9 +24,8 @@ from time import sleep, time
from traceback import print_exc
from random import choice
-import pycurl
-
from pyload.datatypes.PyFile import PyFile
+from pyload.datatypes.OnlineCheck import OnlineCheck
from pyload.network.RequestFactory import getURL
from pyload.utils import lock, uniqify
from pyload.utils.fs import free_space
@@ -63,13 +62,12 @@ class ThreadManager:
# pool of ids for online check
self.resultIDs = 0
- # threads which are fetching hoster results
+ # saved online checks
self.infoResults = {}
+
# timeout for cache purge
self.timestamp = 0
- pycurl.global_init(pycurl.GLOBAL_DEFAULT)
-
for i in range(self.core.config.get("download", "max_downloads")):
self.createThread()
@@ -83,23 +81,26 @@ class ThreadManager:
def createInfoThread(self, data, pid):
""" start a thread which fetches online status and other info's """
self.timestamp = time() + 5 * 60
- if data: InfoThread(self, data, pid)
+ if data: InfoThread(self, None, data, pid)
@lock
- def createResultThread(self, data):
+ def createResultThread(self, user, data):
""" creates a thread to fetch online status, returns result id """
self.timestamp = time() + 5 * 60
rid = self.resultIDs
self.resultIDs += 1
+ oc = OnlineCheck(rid, user)
+ self.infoResults[rid] = oc
+
# maps url to plugin
urls = []
for links in data.itervalues():
for link in links:
urls.append((link.url, link.plugin))
- InfoThread(self, urls, rid=rid)
+ InfoThread(self, user, urls, oc=oc)
return rid
@@ -108,23 +109,13 @@ class ThreadManager:
""" Start decrypting of entered data, all links in one package are accumulated to one thread."""
if data: DecrypterThread(self, data, pid)
-
@lock
def getInfoResult(self, rid):
- """returns result and clears it"""
- self.timestamp = time() + 5 * 60
- # TODO: restrict user to his own results
- if rid in self.infoResults:
- data = self.infoResults[rid]
- self.infoResults[rid] = {}
- return data
- else:
- return {}
+ return self.infoResults.get(rid)
- @lock
- def setInfoResults(self, rid, result):
- self.core.evm.dispatchEvent("linkcheck:updated", rid, result)
- self.infoResults[rid].update(result)
+ def setInfoResults(self, oc, result):
+ self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner)
+ oc.update(result)
def getActiveDownloads(self, user=None):
# TODO: user context
@@ -220,8 +211,7 @@ class ThreadManager:
self.log.warning(_("Failed executing reconnect script!"))
self.core.config["reconnect"]["activated"] = False
self.reconnecting.clear()
- if self.core.debug:
- print_exc()
+ self.core.print_exc()
return
reconn.wait()
@@ -268,6 +258,8 @@ class ThreadManager:
""" make a global curl cleanup (currently unused) """
if self.processingIds():
return False
+ import pycurl
+
pycurl.global_cleanup()
pycurl.global_init(pycurl.GLOBAL_DEFAULT)
self.downloaded = 0
@@ -317,7 +309,3 @@ class ThreadManager:
if occ not in self.core.files.jobCache:
self.core.files.jobCache[occ] = []
self.core.files.jobCache[occ].append(job.id)
-
- def cleanup(self):
- """do global cleanup, should be called when finished with pycurl"""
- pycurl.global_cleanup()