From d35c003cc53d4723d1dfe0d81eeb9bea78cee594 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sat, 31 Dec 2011 16:01:24 +0100 Subject: new crypter plugin API, now decrypting possible for now. --- module/threads/BaseThread.py | 117 ++++++++++++++ module/threads/DecrypterThread.py | 35 +++++ module/threads/DownloadThread.py | 215 ++++++++++++++++++++++++++ module/threads/HookThread.py | 56 +++++++ module/threads/InfoThread.py | 215 ++++++++++++++++++++++++++ module/threads/ThreadManager.py | 311 ++++++++++++++++++++++++++++++++++++++ module/threads/__init__.py | 0 7 files changed, 949 insertions(+) create mode 100644 module/threads/BaseThread.py create mode 100644 module/threads/DecrypterThread.py create mode 100644 module/threads/DownloadThread.py create mode 100644 module/threads/HookThread.py create mode 100644 module/threads/InfoThread.py create mode 100644 module/threads/ThreadManager.py create mode 100644 module/threads/__init__.py (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py new file mode 100644 index 000000000..b5856c856 --- /dev/null +++ b/module/threads/BaseThread.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from threading import Thread +from time import strftime, gmtime +from sys import exc_info +from types import MethodType +from pprint import pformat +from traceback import format_exc + +from module.utils.fs import listdir, join, save_join, stat + +class BaseThread(Thread): + """abstract base class for thread types""" + + def __init__(self, manager): + """Constructor""" + Thread.__init__(self) + self.setDaemon(True) + self.m = manager #thread manager + self.log = manager.core.log + + + def writeDebugReport(self, pyfile): + """ writes a debug report to disk """ + + dump_name = "debug_%s_%s.zip" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S")) + dump = self.getDebugDump(pyfile) + + try: + import zipfile + + zip = zipfile.ZipFile(dump_name, "w") + + for f in listdir(join("tmp", pyfile.pluginname)): + try: + # avoid encoding errors + zip.write(join("tmp", pyfile.pluginname, f), save_join(pyfile.pluginname, f)) + except: + pass + + info = zipfile.ZipInfo(save_join(pyfile.pluginname, "debug_Report.txt"), gmtime()) + info.external_attr = 0644 << 16L # change permissions + + zip.writestr(info, dump) + zip.close() + + if not stat(dump_name).st_size: + raise Exception("Empty Zipfile") + + except Exception, e: + self.log.debug("Error creating zip file: %s" % e) + + dump_name = dump_name.replace(".zip", ".txt") + f = open(dump_name, "wb") + f.write(dump) + f.close() + + self.log.info("Debug Report written to %s" % dump_name) + + def getDebugDump(self, pyfile): + dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( + self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) + + tb = exc_info()[2] + stack = [] + while tb: + stack.append(tb.tb_frame) + tb = tb.tb_next + + for frame in stack[1:]: + dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, + frame.f_code.co_filename, + frame.f_lineno) + + for key, value in frame.f_locals.items(): + dump += "\t%20s = " % key + try: + dump += pformat(value) + "\n" + except Exception, e: + dump += " " + str(e) + "\n" + + del frame + + del stack #delete it just to be sure... + + dump += "\n\nPLUGIN OBJECT DUMP: \n\n" + + for name in dir(pyfile.plugin): + attr = getattr(pyfile.plugin, name) + if not name.endswith("__") and type(attr) != MethodType: + dump += "\t%20s = " % name + try: + dump += pformat(attr) + "\n" + except Exception, e: + dump += " " + str(e) + "\n" + + dump += "\nPYFILE OBJECT DUMP: \n\n" + + for name in dir(pyfile): + attr = getattr(pyfile, name) + if not name.endswith("__") and type(attr) != MethodType: + dump += "\t%20s = " % name + try: + dump += pformat(attr) + "\n" + except Exception, e: + dump += " " + str(e) + "\n" + + dump += "\n\nCONFIG: \n\n" + dump += pformat(self.m.core.config.values) + "\n" + + return dump + + def clean(self, pyfile): + """ set thread unactive and release pyfile """ + self.active = False + pyfile.release() diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py new file mode 100644 index 000000000..5ce59a65e --- /dev/null +++ b/module/threads/DecrypterThread.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from BaseThread import BaseThread + +class DecrypterThread(BaseThread): + """thread for decrypting""" + + def __init__(self, manager, data, package): + """constructor""" + BaseThread.__init__(self, manager) + self.queue = data + self.package = package + + self.m.log.debug("Starting Decrypt thread") + + self.start() + + def add(self, data): + self.queue.extend(data) + + def run(self): + plugin_map = {} + for plugin, url in self.queue: + if plugin in plugin_map: + plugin_map[plugin].append(url) + else: + plugin_map[plugin] = [url] + + + self.decrypt(plugin_map) + + def decrypt(self, plugin_map): + for name, urls in plugin_map.iteritems(): + p = self.m.core.pluginManager.loadClass("crypter", name) diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py new file mode 100644 index 000000000..3d444686b --- /dev/null +++ b/module/threads/DownloadThread.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . + + @author: RaNaN +""" + +from Queue import Queue +from time import sleep, time +from traceback import print_exc +from sys import exc_clear +from pycurl import error + +from module.plugins.Base import Fail, Retry +from module.plugins.Hoster import Abort, Reconnect, SkipDownload + +from BaseThread import BaseThread + +class DownloadThread(BaseThread): + """thread for downloading files from 'real' hoster plugins""" + + def __init__(self, manager): + """Constructor""" + BaseThread.__init__(self, manager) + + self.queue = Queue() # job queue + self.active = False + + self.start() + + def run(self): + """run method""" + pyfile = None + + while True: + del pyfile + self.active = self.queue.get() + pyfile = self.active + + if self.active == "quit": + self.active = False + self.m.threads.remove(self) + return True + + try: + if not pyfile.hasPlugin(): continue + #this pyfile was deleted while queueing + + pyfile.plugin.checkForSameFiles(starting=True) + self.m.log.info(_("Download starts: %s" % pyfile.name)) + + # start download + self.m.core.hookManager.downloadPreparing(pyfile) + pyfile.plugin.preprocessing(self) + + self.m.log.info(_("Download finished: %s") % pyfile.name) + self.m.core.hookManager.downloadFinished(pyfile) + self.m.core.files.checkPackageFinished(pyfile) + + except NotImplementedError: + self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) + pyfile.setStatus("failed") + pyfile.error = "Plugin does not work" + self.clean(pyfile) + continue + + except Abort: + try: + self.m.log.info(_("Download aborted: %s") % pyfile.name) + except: + pass + + pyfile.setStatus("aborted") + + self.clean(pyfile) + continue + + except Reconnect: + self.queue.put(pyfile) + #pyfile.req.clearCookies() + + while self.m.reconnecting.isSet(): + sleep(0.5) + + continue + + except Retry, e: + reason = e.args[0] + self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) + self.queue.put(pyfile) + continue + + except Fail, e: + msg = e.args[0] + + if msg == "offline": + pyfile.setStatus("offline") + self.m.log.warning(_("Download is offline: %s") % pyfile.name) + elif msg == "temp. offline": + pyfile.setStatus("temp. offline") + self.m.log.warning(_("Download is temporary offline: %s") % pyfile.name) + else: + pyfile.setStatus("failed") + self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) + pyfile.error = msg + + self.m.core.hookManager.downloadFailed(pyfile) + self.clean(pyfile) + continue + + except error, e: + if len(e.args) == 2: + code, msg = e.args + else: + code = 0 + msg = e.args + + self.m.log.debug("pycurl exception %s: %s" % (code, msg)) + + if code in (7, 18, 28, 52, 56): + self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) + wait = time() + 60 + + pyfile.waitUntil = wait + pyfile.setStatus("waiting") + while time() < wait: + sleep(1) + if pyfile.abort: + break + + if pyfile.abort: + self.m.log.info(_("Download aborted: %s") % pyfile.name) + pyfile.setStatus("aborted") + + self.clean(pyfile) + else: + self.queue.put(pyfile) + + continue + + else: + pyfile.setStatus("failed") + self.m.log.error("pycurl error %s: %s" % (code, msg)) + if self.m.core.debug: + print_exc() + self.writeDebugReport(pyfile) + + self.m.core.hookManager.downloadFailed(pyfile) + + self.clean(pyfile) + continue + + except SkipDownload, e: + pyfile.setStatus("skipped") + + self.m.log.info( + _("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message}) + + self.clean(pyfile) + + self.m.core.files.checkPackageFinished(pyfile) + + self.active = False + self.m.core.files.save() + + continue + + + except Exception, e: + pyfile.setStatus("failed") + self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) + pyfile.error = str(e) + + if self.m.core.debug: + print_exc() + self.writeDebugReport(pyfile) + + self.m.core.hookManager.downloadFailed(pyfile) + self.clean(pyfile) + continue + + finally: + self.m.core.files.save() + pyfile.checkIfProcessed() + exc_clear() + + + #pyfile.plugin.req.clean() + + self.active = False + pyfile.finishIfDone() + self.m.core.files.save() + + + def put(self, job): + """assing job to thread""" + self.queue.put(job) + + + def stop(self): + """stops the thread""" + self.put("quit") \ No newline at end of file diff --git a/module/threads/HookThread.py b/module/threads/HookThread.py new file mode 100644 index 000000000..fe4a2a651 --- /dev/null +++ b/module/threads/HookThread.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from copy import copy + +from BaseThread import BaseThread + +class HookThread(BaseThread): + """thread for hooks""" + + def __init__(self, m, function, args, kwargs): + """Constructor""" + BaseThread.__init__(self, m) + + self.f = function + self.args = args + self.kwargs = kwargs + + self.active = [] + + m.localThreads.append(self) + + self.start() + + def getActiveFiles(self): + return self.active + + def addActive(self, pyfile): + """ Adds a pyfile to active list and thus will be displayed on overview""" + if pyfile not in self.active: + self.active.append(pyfile) + + def finishFile(self, pyfile): + if pyfile in self.active: + self.active.remove(pyfile) + + pyfile.finishIfDone() + + def run(self): + try: + try: + self.kwargs["thread"] = self + self.f(*self.args, **self.kwargs) + except TypeError, e: + #dirty method to filter out exceptions + if "unexpected keyword argument 'thread'" not in e.args[0]: + raise + + del self.kwargs["thread"] + self.f(*self.args, **self.kwargs) + finally: + local = copy(self.active) + for x in local: + self.finishFile(x) + + self.m.localThreads.remove(self) \ No newline at end of file diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py new file mode 100644 index 000000000..4cba7da38 --- /dev/null +++ b/module/threads/InfoThread.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from time import time +from traceback import print_exc + +from module.Api import OnlineStatus +from module.PyFile import PyFile +from module.common.packagetools import parseNames + +from BaseThread import BaseThread + +class InfoThread(BaseThread): + def __init__(self, manager, data, pid=-1, rid=-1, add=False): + """Constructor""" + BaseThread.__init__(self, manager) + + self.data = data + self.pid = pid # package id + # [ .. (name, plugin) .. ] + + self.rid = rid #result id + self.add = add #add packages instead of return result + + self.cache = [] #accumulated data + + self.start() + + def run(self): + """run method""" + + plugins = {} + container = [] + + for url, plugin in self.data: + if plugin in plugins: + plugins[plugin].append(url) + else: + plugins[plugin] = [url] + + + # filter out container plugins + for name in self.m.core.pluginManager.getPlugins("container"): + if name in plugins: + container.extend([(name, url) for url in plugins[name]]) + + del plugins[name] + + #directly write to database + if self.pid > -1: + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + if hasattr(plugin, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) + self.m.core.files.save() + + elif self.add: + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + if hasattr(plugin, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateCache, True) + + else: + #generate default result + result = [(url, 0, 3, url) for url in urls] + + self.updateCache(pluginname, result) + + packs = parseNames([(name, url) for name, x, y, url in self.cache]) + + self.m.log.debug("Fetched and generated %d packages" % len(packs)) + + for k, v in packs: + self.m.core.api.addPackage(k, v) + + #empty cache + del self.cache[:] + + else: #post the results + + + for name, url in container: + #attach container content + try: + data = self.decryptContainer(name, url) + except: + print_exc() + self.m.log.error("Could not decrypt container.") + data = [] + + for url, plugin in data: + if plugin in plugins: + plugins[plugin].append(url) + else: + plugins[plugin] = [url] + + self.m.infoResults[self.rid] = {} + + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + if hasattr(plugin, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + + #force to process cache + if self.cache: + self.updateResult(pluginname, [], True) + + else: + #generate default result + result = [(url, 0, 3, url) for url in urls] + + self.updateResult(pluginname, result, True) + + self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} + + self.m.timestamp = time() + 5 * 60 + + + def updateDB(self, plugin, result): + self.m.core.files.updateFileInfo(result, self.pid) + + def updateResult(self, plugin, result, force=False): + #parse package name and generate result + #accumulate results + + self.cache.extend(result) + + if len(self.cache) >= 20 or force: + #used for package generating + tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size)))) + for name, size, status, url in self.cache] + + data = parseNames(tmp) + result = {} + for k, v in data.iteritems(): + for url, status in v: + status.packagename = k + result[url] = status + + self.m.setInfoResults(self.rid, result) + + self.cache = [] + + def updateCache(self, plugin, result): + self.cache.extend(result) + + def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): + try: + result = [] #result loaded from cache + process = [] #urls to process + for url in urls: + if url in self.m.infoCache: + result.append(self.m.infoCache[url]) + else: + process.append(url) + + if result: + self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) + cb(pluginname, result) + + if process: + self.m.log.debug("Run Info Fetching for %s" % pluginname) + for result in plugin.getInfo(process): + #result = [ .. (name, size, status, url) .. ] + if not type(result) == list: result = [result] + + for res in result: + self.m.infoCache[res[3]] = res + + cb(pluginname, result) + + self.m.log.debug("Finished Info Fetching for %s" % pluginname) + except Exception, e: + self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % + {"name": pluginname, "err": str(e)}) + if self.m.core.debug: + print_exc() + + # generate default results + if err: + result = [(url, 0, 3, url) for url in urls] + cb(pluginname, result) + + + def decryptContainer(self, plugin, url): + data = [] + # only works on container plugins + + self.m.log.debug("Pre decrypting %s with %s" % (url, plugin)) + + # dummy pyfile + pyfile = PyFile(self.m.core.files, -1, url, url, 0, 0, "", plugin, -1, -1) + + pyfile.initPlugin() + + # little plugin lifecycle + try: + pyfile.plugin.setup() + pyfile.plugin.loadToDisk() + pyfile.plugin.decrypt(pyfile) + pyfile.plugin.deleteTmp() + + for pack in pyfile.plugin.packages: + pyfile.plugin.urls.extend(pack[1]) + + data, crypter = self.m.core.pluginManager.parseUrls(pyfile.plugin.urls) + + self.m.log.debug("Got %d links." % len(data)) + + except Exception, e: + self.m.log.debug("Pre decrypting error: %s" % str(e)) + finally: + pyfile.release() + + return data diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py new file mode 100644 index 000000000..c32286eb9 --- /dev/null +++ b/module/threads/ThreadManager.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . + + @author: RaNaN +""" + +from os.path import exists, join +import re +from subprocess import Popen +from threading import Event, Lock +from time import sleep, time +from traceback import print_exc +from random import choice + +import pycurl + +from module.PyFile import PyFile +from module.network.RequestFactory import getURL +from module.utils import lock +from module.utils.fs import free_space + +from DecrypterThread import DecrypterThread +from DownloadThread import DownloadThread +from InfoThread import InfoThread + +class ThreadManager: + """manages the download threads, assign jobs, reconnect etc""" + + + def __init__(self, core): + """Constructor""" + self.core = core + self.log = core.log + + self.threads = [] # thread list + self.localThreads = [] #hook+decrypter threads + + self.pause = True + + self.reconnecting = Event() + self.reconnecting.clear() + self.downloaded = 0 #number of files downloaded since last cleanup + + self.lock = Lock() + + # some operations require to fetch url info from hoster, so we caching them so it wont be done twice + # contains a timestamp and will be purged after timeout + self.infoCache = {} + + # pool of ids for online check + self.resultIDs = 0 + + # threads which are fetching hoster results + self.infoResults = {} + # timeout for cache purge + self.timestamp = 0 + + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + + for i in range(0, self.core.config.get("download", "max_downloads")): + self.createThread() + + + def createThread(self): + """create a download thread""" + + thread = DownloadThread(self) + self.threads.append(thread) + + def createInfoThread(self, data, pid): + """ start a thread whichs fetches online status and other infos """ + self.timestamp = time() + 5 * 60 + + InfoThread(self, data, pid) + + @lock + def createResultThread(self, data, add=False): + """ creates a thread to fetch online status, returns result id """ + self.timestamp = time() + 5 * 60 + + rid = self.resultIDs + self.resultIDs += 1 + + InfoThread(self, data, rid=rid, add=add) + + return rid + + @lock + def createDecryptThread(self, data, pid): + """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" + DecrypterThread(self, data, pid) + + + @lock + def getInfoResult(self, rid): + """returns result and clears it""" + self.timestamp = time() + 5 * 60 + + if rid in self.infoResults: + data = self.infoResults[rid] + self.infoResults[rid] = {} + return data + else: + return {} + + @lock + def setInfoResults(self, rid, result): + self.infoResults[rid].update(result) + + def getActiveFiles(self): + active = [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] + + for t in self.localThreads: + active.extend(t.getActiveFiles()) + + return active + + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.id for x in self.getActiveFiles()] + + + def work(self): + """run all task which have to be done (this is for repetivive call by core)""" + try: + self.tryReconnect() + except Exception, e: + self.log.error(_("Reconnect Failed: %s") % str(e) ) + self.reconnecting.clear() + if self.core.debug: + print_exc() + self.checkThreadCount() + + try: + self.assignJob() + except Exception, e: + self.log.warning("Assign job error", e) + if self.core.debug: + print_exc() + + sleep(0.5) + self.assignJob() + #it may be failed non critical so we try it again + + if (self.infoCache or self.infoResults) and self.timestamp < time(): + self.infoCache.clear() + self.infoResults.clear() + self.log.debug("Cleared Result cache") + + def tryReconnect(self): + """checks if reconnect needed""" + + if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): + return False + + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] + + if not (0 < active.count(True) == len(active)): + return False + + if not exists(self.core.config['reconnect']['method']): + if exists(join(pypath, self.core.config['reconnect']['method'])): + self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) + else: + self.core.config["reconnect"]["activated"] = False + self.log.warning(_("Reconnect script not found!")) + return + + self.reconnecting.set() + + #Do reconnect + self.log.info(_("Starting reconnect")) + + while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: + sleep(0.25) + + ip = self.getIP() + + self.core.hookManager.beforeReconnecting(ip) + + self.log.debug("Old IP: %s" % ip) + + try: + reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) + except: + self.log.warning(_("Failed executing reconnect script!")) + self.core.config["reconnect"]["activated"] = False + self.reconnecting.clear() + if self.core.debug: + print_exc() + return + + reconn.wait() + sleep(1) + ip = self.getIP() + self.core.hookManager.afterReconnecting(ip) + + self.log.info(_("Reconnected, new IP: %s") % ip) + + self.reconnecting.clear() + + def getIP(self): + """retrieve current ip""" + services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), + ("http://checkip.dyndns.org/",".*Current IP Address: (\S+).*")] + + ip = "" + for i in range(10): + try: + sv = choice(services) + ip = getURL(sv[0]) + ip = re.match(sv[1], ip).group(1) + break + except: + ip = "" + sleep(1) + + return ip + + def checkThreadCount(self): + """checks if there are need for increasing or reducing thread count""" + + if len(self.threads) == self.core.config.get("download", "max_downloads"): + return True + elif len(self.threads) < self.core.config.get("download", "max_downloads"): + self.createThread() + else: + free = [x for x in self.threads if not x.active] + if free: + free[0].put("quit") + + + def cleanPycurl(self): + """ make a global curl cleanup (currently ununused) """ + if self.processingIds(): + return False + pycurl.global_cleanup() + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + self.downloaded = 0 + self.log.debug("Cleaned up pycurl") + return True + + + def assignJob(self): + """assing a job to a thread if possible""" + + if self.pause or not self.core.api.isTimeDownload(): return + + #if self.downloaded > 20: + # if not self.cleanPyCurl(): return + + free = [x for x in self.threads if not x.active] + + inuse = set([(x.active.pluginname,self.getLimit(x)) for x in self.threads if x.active and x.active.hasPlugin() and x.active.plugin.account]) + inuse = map(lambda x : (x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) ,inuse) + onlimit = [x[0] for x in inuse if 0 < x[1] <= x[2]] + + occ = [x.active.pluginname for x in self.threads if x.active and x.active.hasPlugin() and not x.active.plugin.multiDL] + onlimit + + occ.sort() + occ = tuple(set(occ)) + job = self.core.files.getJob(occ) + if job: + try: + job.initPlugin() + except Exception, e: + self.log.critical(str(e)) + print_exc() + job.setStatus("failed") + job.error = str(e) + job.release() + return + + spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 + if spaceLeft < self.core.config["general"]["min_free_space"]: + self.log.warning(_("Not enough space left on device")) + self.pause = True + + if free and not self.pause: + thread = free[0] + #self.downloaded += 1 + thread.put(job) + else: + #put job back + if occ not in self.core.files.jobCache: + self.core.files.jobCache[occ] = [] + self.core.files.jobCache[occ].append(job.id) + + def getLimit(self, thread): + limit = thread.active.plugin.account.options.get("limitDL","0") + if limit == "": limit = "0" + return int(limit) + + + def cleanup(self): + """do global cleanup, should be called when finished with pycurl""" + pycurl.global_cleanup() diff --git a/module/threads/__init__.py b/module/threads/__init__.py new file mode 100644 index 000000000..e69de29bb -- cgit v1.2.3 From 35742c2cb023ac49ab3056752d2040cdb030cc2b Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sun, 1 Jan 2012 13:36:59 +0100 Subject: Happy new Year ! --- module/threads/BaseThread.py | 30 ++++++++++++------ module/threads/DecrypterThread.py | 65 ++++++++++++++++++++++++++++++++------- module/threads/DownloadThread.py | 4 +-- module/threads/InfoThread.py | 51 ++++++++++++------------------ module/threads/ThreadManager.py | 11 +++---- 5 files changed, 101 insertions(+), 60 deletions(-) (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py index b5856c856..1ba3f7a9f 100644 --- a/module/threads/BaseThread.py +++ b/module/threads/BaseThread.py @@ -1,6 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import os +import sys +import locale + from threading import Thread from time import strftime, gmtime from sys import exc_info @@ -14,32 +18,33 @@ class BaseThread(Thread): """abstract base class for thread types""" def __init__(self, manager): - """Constructor""" Thread.__init__(self) self.setDaemon(True) self.m = manager #thread manager self.log = manager.core.log - - def writeDebugReport(self, pyfile): + def writeDebugReport(self, name, pyfile=None, plugin=None): """ writes a debug report to disk """ - dump_name = "debug_%s_%s.zip" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S")) - dump = self.getDebugDump(pyfile) + dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S")) + if pyfile: + dump = self.getFileDump(pyfile) + else: + dump = self.getPluginDump(plugin) try: import zipfile zip = zipfile.ZipFile(dump_name, "w") - for f in listdir(join("tmp", pyfile.pluginname)): + for f in listdir(join("tmp", name)): try: # avoid encoding errors - zip.write(join("tmp", pyfile.pluginname, f), save_join(pyfile.pluginname, f)) + zip.write(join("tmp", name, f), save_join(name, f)) except: pass - info = zipfile.ZipInfo(save_join(pyfile.pluginname, "debug_Report.txt"), gmtime()) + info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime()) info.external_attr = 0644 << 16L # change permissions zip.writestr(info, dump) @@ -58,7 +63,7 @@ class BaseThread(Thread): self.log.info("Debug Report written to %s" % dump_name) - def getDebugDump(self, pyfile): + def getFileDump(self, pyfile): dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) @@ -111,6 +116,13 @@ class BaseThread(Thread): return dump + #TODO + def getPluginDump(self, plugin): + return "" + + def getSystemDump(self): + return "" + def clean(self, pyfile): """ set thread unactive and release pyfile """ self.active = False diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py index 5ce59a65e..a1b7e4f38 100644 --- a/module/threads/DecrypterThread.py +++ b/module/threads/DecrypterThread.py @@ -1,35 +1,78 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from time import sleep +from traceback import print_exc + +from module.plugins.Base import Retry +from module.plugins.Crypter import Package + from BaseThread import BaseThread class DecrypterThread(BaseThread): """thread for decrypting""" - def __init__(self, manager, data, package): + def __init__(self, manager, data, pid): """constructor""" BaseThread.__init__(self, manager) - self.queue = data - self.package = package - - self.m.log.debug("Starting Decrypt thread") + self.data = data + self.pid = pid self.start() - def add(self, data): - self.queue.extend(data) - def run(self): plugin_map = {} - for plugin, url in self.queue: + for url, plugin in self.data: if plugin in plugin_map: plugin_map[plugin].append(url) else: plugin_map[plugin] = [url] - self.decrypt(plugin_map) def decrypt(self, plugin_map): + pack = self.m.core.files.getPackage(self.pid) + result = [] + for name, urls in plugin_map.iteritems(): - p = self.m.core.pluginManager.loadClass("crypter", name) + klass = self.m.core.pluginManager.loadClass("crypter", name) + plugin = klass(self.m.core, pack, pack.password) + plugin_result = [] + + try: + try: + plugin_result = plugin._decrypt(urls) + except Retry: + sleep(1) + plugin_result = plugin._decrypt(urls) + except Exception, e: + plugin.logError(_("Decrypting failed"), e) + if self.m.core.debug: + print_exc() + self.writeDebugReport(plugin.__name__, plugin=plugin) + + plugin.logDebug("Decrypted", plugin_result) + result.extend(plugin_result) + + pack_names = {} + urls = [] + + for p in result: + if isinstance(p, Package): + if p.name in pack_names: + pack_names[p.name].urls.extend(p.urls) + else: + pack_names[p.name] = p + else: + urls.append(p) + + if urls: + self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) + self.m.core.api.addFiles(self.pid, urls) + + for p in pack_names: + self.m.core.api.addPackage(p.name, p.urls, p.dest, pack.password) + + if not result: + self.log.info(_("No links decrypted")) + diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 3d444686b..638861338 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -156,7 +156,7 @@ class DownloadThread(BaseThread): self.m.log.error("pycurl error %s: %s" % (code, msg)) if self.m.core.debug: print_exc() - self.writeDebugReport(pyfile) + self.writeDebugReport(pyfile.pluginname, pyfile) self.m.core.hookManager.downloadFailed(pyfile) @@ -186,7 +186,7 @@ class DownloadThread(BaseThread): if self.m.core.debug: print_exc() - self.writeDebugReport(pyfile) + self.writeDebugReport(pyfile.pluginname, pyfile) self.m.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py index 4cba7da38..596153c4b 100644 --- a/module/threads/InfoThread.py +++ b/module/threads/InfoThread.py @@ -7,11 +7,12 @@ from traceback import print_exc from module.Api import OnlineStatus from module.PyFile import PyFile from module.common.packagetools import parseNames +from module.utils import has_method from BaseThread import BaseThread class InfoThread(BaseThread): - def __init__(self, manager, data, pid=-1, rid=-1, add=False): + def __init__(self, manager, data, pid=-1, rid=-1): """Constructor""" BaseThread.__init__(self, manager) @@ -20,7 +21,6 @@ class InfoThread(BaseThread): # [ .. (name, plugin) .. ] self.rid = rid #result id - self.add = add #add packages instead of return result self.cache = [] #accumulated data @@ -39,8 +39,8 @@ class InfoThread(BaseThread): plugins[plugin] = [url] - # filter out container plugins - for name in self.m.core.pluginManager.getPlugins("container"): + # filter out crypter plugins + for name in self.m.core.pluginManager.getPlugins("crypter"): if name in plugins: container.extend([(name, url) for url in plugins[name]]) @@ -50,35 +50,17 @@ class InfoThread(BaseThread): if self.pid > -1: for pluginname, urls in plugins.iteritems(): plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - if hasattr(plugin, "getInfo"): + klass = getattr(plugin, pluginname) + if has_method(klass, "getInfo"): + self.fetchForPlugin(pluginname, klass, urls, self.updateDB) + self.m.core.files.save() + elif has_method(plugin, "getInfo"): + self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead") self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) self.m.core.files.save() - elif self.add: - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - if hasattr(plugin, "getInfo"): - self.fetchForPlugin(pluginname, plugin, urls, self.updateCache, True) - - else: - #generate default result - result = [(url, 0, 3, url) for url in urls] - - self.updateCache(pluginname, result) - - packs = parseNames([(name, url) for name, x, y, url in self.cache]) - - self.m.log.debug("Fetched and generated %d packages" % len(packs)) - - for k, v in packs: - self.m.core.api.addPackage(k, v) - - #empty cache - del self.cache[:] - else: #post the results - - + #TODO: finer crypter control for name, url in container: #attach container content try: @@ -98,13 +80,18 @@ class InfoThread(BaseThread): for pluginname, urls in plugins.iteritems(): plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - if hasattr(plugin, "getInfo"): + klass = getattr(plugin, pluginname) + if has_method(klass, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + #force to process cache + if self.cache: + self.updateResult(pluginname, [], True) + elif has_method(plugin, "getInfo"): + self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) - #force to process cache if self.cache: self.updateResult(pluginname, [], True) - else: #generate default result result = [(url, 0, 3, url) for url in urls] diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index c32286eb9..612da2536 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -71,7 +71,7 @@ class ThreadManager: pycurl.global_init(pycurl.GLOBAL_DEFAULT) - for i in range(0, self.core.config.get("download", "max_downloads")): + for i in range(self.core.config.get("download", "max_downloads")): self.createThread() @@ -84,25 +84,24 @@ class ThreadManager: def createInfoThread(self, data, pid): """ start a thread whichs fetches online status and other infos """ self.timestamp = time() + 5 * 60 - - InfoThread(self, data, pid) + if data: InfoThread(self, data, pid) @lock - def createResultThread(self, data, add=False): + def createResultThread(self, data): """ creates a thread to fetch online status, returns result id """ self.timestamp = time() + 5 * 60 rid = self.resultIDs self.resultIDs += 1 - InfoThread(self, data, rid=rid, add=add) + InfoThread(self, data, rid=rid) return rid @lock def createDecryptThread(self, data, pid): """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" - DecrypterThread(self, data, pid) + if data: DecrypterThread(self, data, pid) @lock -- cgit v1.2.3 From 4a3a81b63cd85cc3dcd9669868a2079da65838a2 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Tue, 3 Jan 2012 20:41:23 +0100 Subject: fixes for old style decrypter --- module/threads/BaseThread.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py index 1ba3f7a9f..f4885aadc 100644 --- a/module/threads/BaseThread.py +++ b/module/threads/BaseThread.py @@ -12,7 +12,7 @@ from types import MethodType from pprint import pformat from traceback import format_exc -from module.utils.fs import listdir, join, save_join, stat +from module.utils.fs import listdir, join, save_join, stat, exists class BaseThread(Thread): """abstract base class for thread types""" @@ -37,17 +37,22 @@ class BaseThread(Thread): zip = zipfile.ZipFile(dump_name, "w") - for f in listdir(join("tmp", name)): - try: - # avoid encoding errors - zip.write(join("tmp", name, f), save_join(name, f)) - except: - pass + if exists(join("tmp", name)): + for f in listdir(join("tmp", name)): + try: + # avoid encoding errors + zip.write(join("tmp", name, f), save_join(name, f)) + except: + pass info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime()) info.external_attr = 0644 << 16L # change permissions - zip.writestr(info, dump) + + info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime()) + info.external_attr = 0644 << 16L + zip.writestr(info, self.getSystemDump()) + zip.close() if not stat(dump_name).st_size: -- cgit v1.2.3 From b877847094b0ba03a098dff0fd769eb456b48dd1 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Fri, 6 Jan 2012 17:54:53 +0100 Subject: several improvements, also closes #486, #487 --- module/threads/DecrypterThread.py | 2 ++ module/threads/HookThread.py | 9 ++++++++ module/threads/InfoThread.py | 48 +++++++++++---------------------------- 3 files changed, 24 insertions(+), 35 deletions(-) (limited to 'module/threads') diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py index a1b7e4f38..8edb97c34 100644 --- a/module/threads/DecrypterThread.py +++ b/module/threads/DecrypterThread.py @@ -4,6 +4,7 @@ from time import sleep from traceback import print_exc +from module.utils import uniqify from module.plugins.Base import Retry from module.plugins.Crypter import Package @@ -54,6 +55,7 @@ class DecrypterThread(BaseThread): plugin.logDebug("Decrypted", plugin_result) result.extend(plugin_result) + result = uniqify(result) pack_names = {} urls = [] diff --git a/module/threads/HookThread.py b/module/threads/HookThread.py index fe4a2a651..bffa72ca0 100644 --- a/module/threads/HookThread.py +++ b/module/threads/HookThread.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- from copy import copy +from traceback import print_exc from BaseThread import BaseThread @@ -48,6 +49,14 @@ class HookThread(BaseThread): del self.kwargs["thread"] self.f(*self.args, **self.kwargs) + except Exception, e: + if hasattr(self.f, "im_self"): + hook = self.f.im_self + hook.logError(_("An Error occured"), e) + if self.m.core.debug: + print_exc() + self.writeDebugReport(hook.__name__, plugin=hook) + finally: local = copy(self.active) for x in local: diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py index 596153c4b..c1e4458ef 100644 --- a/module/threads/InfoThread.py +++ b/module/threads/InfoThread.py @@ -30,7 +30,7 @@ class InfoThread(BaseThread): """run method""" plugins = {} - container = [] + crypter = {} for url, plugin in self.data: if plugin in plugins: @@ -42,8 +42,7 @@ class InfoThread(BaseThread): # filter out crypter plugins for name in self.m.core.pluginManager.getPlugins("crypter"): if name in plugins: - container.extend([(name, url) for url in plugins[name]]) - + crypter[name] = plugins[name] del plugins[name] #directly write to database @@ -60,11 +59,10 @@ class InfoThread(BaseThread): self.m.core.files.save() else: #post the results - #TODO: finer crypter control - for name, url in container: + for name, urls in crypter: #attach container content try: - data = self.decryptContainer(name, url) + data = self.decrypt(name, urls) except: print_exc() self.m.log.error("Could not decrypt container.") @@ -169,34 +167,14 @@ class InfoThread(BaseThread): cb(pluginname, result) - def decryptContainer(self, plugin, url): - data = [] - # only works on container plugins - - self.m.log.debug("Pre decrypting %s with %s" % (url, plugin)) - - # dummy pyfile - pyfile = PyFile(self.m.core.files, -1, url, url, 0, 0, "", plugin, -1, -1) - - pyfile.initPlugin() - - # little plugin lifecycle - try: - pyfile.plugin.setup() - pyfile.plugin.loadToDisk() - pyfile.plugin.decrypt(pyfile) - pyfile.plugin.deleteTmp() - - for pack in pyfile.plugin.packages: - pyfile.plugin.urls.extend(pack[1]) - - data, crypter = self.m.core.pluginManager.parseUrls(pyfile.plugin.urls) + def decrypt(self, plugin, urls): + self.m.log.debug("Pre decrypting %s" % plugin) + klass = self.m.core.pluginManager.loadClass("crypter", plugin) - self.m.log.debug("Got %d links." % len(data)) - - except Exception, e: - self.m.log.debug("Pre decrypting error: %s" % str(e)) - finally: - pyfile.release() + # only decrypt files + if has_method(klass, "decryptFile"): + urls = p.decrypt(urls) + data, crypter = self.m.core.pluginManager.parseUrls(urls) + return data - return data + return [] -- cgit v1.2.3 From 1bb6ebf544b43cacf7c0755c5a8608b79b95e2d6 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sat, 7 Jan 2012 20:11:16 +0100 Subject: MultiHoster plugin type, some fixes, new documentation structure --- module/threads/DecrypterThread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'module/threads') diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py index 8edb97c34..ce3c8cd83 100644 --- a/module/threads/DecrypterThread.py +++ b/module/threads/DecrypterThread.py @@ -72,7 +72,7 @@ class DecrypterThread(BaseThread): self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) self.m.core.api.addFiles(self.pid, urls) - for p in pack_names: + for p in pack_names.itervalues(): self.m.core.api.addPackage(p.name, p.urls, p.dest, pack.password) if not result: -- cgit v1.2.3 From 6eaa7bb25e2254c80c43fe46166142d590e86c64 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sat, 7 Jan 2012 23:58:28 +0100 Subject: some cleanups --- module/threads/BaseThread.py | 1 + module/threads/DownloadThread.py | 54 ++++++++++++++++++++-------------------- module/threads/InfoThread.py | 18 +++----------- 3 files changed, 31 insertions(+), 42 deletions(-) (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py index f4885aadc..526913e9b 100644 --- a/module/threads/BaseThread.py +++ b/module/threads/BaseThread.py @@ -21,6 +21,7 @@ class BaseThread(Thread): Thread.__init__(self) self.setDaemon(True) self.m = manager #thread manager + self.core = manager.core self.log = manager.core.log def writeDebugReport(self, name, pyfile=None, plugin=None): diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 638861338..e140703d5 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -60,18 +60,18 @@ class DownloadThread(BaseThread): #this pyfile was deleted while queueing pyfile.plugin.checkForSameFiles(starting=True) - self.m.log.info(_("Download starts: %s" % pyfile.name)) + self.log.info(_("Download starts: %s" % pyfile.name)) # start download - self.m.core.hookManager.downloadPreparing(pyfile) + self.core.hookManager.downloadPreparing(pyfile) pyfile.plugin.preprocessing(self) - self.m.log.info(_("Download finished: %s") % pyfile.name) - self.m.core.hookManager.downloadFinished(pyfile) - self.m.core.files.checkPackageFinished(pyfile) + self.log.info(_("Download finished: %s") % pyfile.name) + self.core.hookManager.downloadFinished(pyfile) + self.core.files.checkPackageFinished(pyfile) except NotImplementedError: - self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) + self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) pyfile.setStatus("failed") pyfile.error = "Plugin does not work" self.clean(pyfile) @@ -79,7 +79,7 @@ class DownloadThread(BaseThread): except Abort: try: - self.m.log.info(_("Download aborted: %s") % pyfile.name) + self.log.info(_("Download aborted: %s") % pyfile.name) except: pass @@ -99,7 +99,7 @@ class DownloadThread(BaseThread): except Retry, e: reason = e.args[0] - self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) + self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) self.queue.put(pyfile) continue @@ -108,16 +108,16 @@ class DownloadThread(BaseThread): if msg == "offline": pyfile.setStatus("offline") - self.m.log.warning(_("Download is offline: %s") % pyfile.name) + self.log.warning(_("Download is offline: %s") % pyfile.name) elif msg == "temp. offline": pyfile.setStatus("temp. offline") - self.m.log.warning(_("Download is temporary offline: %s") % pyfile.name) + self.log.warning(_("Download is temporary offline: %s") % pyfile.name) else: pyfile.setStatus("failed") - self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) + self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) pyfile.error = msg - self.m.core.hookManager.downloadFailed(pyfile) + self.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) continue @@ -128,10 +128,10 @@ class DownloadThread(BaseThread): code = 0 msg = e.args - self.m.log.debug("pycurl exception %s: %s" % (code, msg)) + self.log.debug("pycurl exception %s: %s" % (code, msg)) if code in (7, 18, 28, 52, 56): - self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) + self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) wait = time() + 60 pyfile.waitUntil = wait @@ -142,7 +142,7 @@ class DownloadThread(BaseThread): break if pyfile.abort: - self.m.log.info(_("Download aborted: %s") % pyfile.name) + self.log.info(_("Download aborted: %s") % pyfile.name) pyfile.setStatus("aborted") self.clean(pyfile) @@ -153,12 +153,12 @@ class DownloadThread(BaseThread): else: pyfile.setStatus("failed") - self.m.log.error("pycurl error %s: %s" % (code, msg)) - if self.m.core.debug: + self.log.error("pycurl error %s: %s" % (code, msg)) + if self.core.debug: print_exc() self.writeDebugReport(pyfile.pluginname, pyfile) - self.m.core.hookManager.downloadFailed(pyfile) + self.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) continue @@ -166,34 +166,34 @@ class DownloadThread(BaseThread): except SkipDownload, e: pyfile.setStatus("skipped") - self.m.log.info( - _("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message}) + self.log.info(_("Download skipped: %(name)s due to %(plugin)s") + % {"name": pyfile.name, "plugin": e.message}) self.clean(pyfile) - self.m.core.files.checkPackageFinished(pyfile) + self.core.files.checkPackageFinished(pyfile) self.active = False - self.m.core.files.save() + self.core.files.save() continue except Exception, e: pyfile.setStatus("failed") - self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) + self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) pyfile.error = str(e) - if self.m.core.debug: + if self.core.debug: print_exc() self.writeDebugReport(pyfile.pluginname, pyfile) - self.m.core.hookManager.downloadFailed(pyfile) + self.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) continue finally: - self.m.core.files.save() + self.core.files.save() pyfile.checkIfProcessed() exc_clear() @@ -202,7 +202,7 @@ class DownloadThread(BaseThread): self.active = False pyfile.finishIfDone() - self.m.core.files.save() + self.core.files.save() def put(self, job): diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py index c1e4458ef..5f21d487c 100644 --- a/module/threads/InfoThread.py +++ b/module/threads/InfoThread.py @@ -5,9 +5,8 @@ from time import time from traceback import print_exc from module.Api import OnlineStatus -from module.PyFile import PyFile from module.common.packagetools import parseNames -from module.utils import has_method +from module.utils import has_method, accumulate from BaseThread import BaseThread @@ -29,16 +28,9 @@ class InfoThread(BaseThread): def run(self): """run method""" - plugins = {} + plugins = accumulate(self.data) crypter = {} - for url, plugin in self.data: - if plugin in plugins: - plugins[plugin].append(url) - else: - plugins[plugin] = [url] - - # filter out crypter plugins for name in self.m.core.pluginManager.getPlugins("crypter"): if name in plugins: @@ -68,11 +60,7 @@ class InfoThread(BaseThread): self.m.log.error("Could not decrypt container.") data = [] - for url, plugin in data: - if plugin in plugins: - plugins[plugin].append(url) - else: - plugins[plugin] = [url] + accumulate(data, plugins) self.m.infoResults[self.rid] = {} -- cgit v1.2.3 From bac28b7740aae772636d8b90e291d9c17dfd59a7 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sun, 8 Jan 2012 14:44:59 +0100 Subject: new MultiHoster hook --- module/threads/DownloadThread.py | 4 ++-- module/threads/InfoThread.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'module/threads') diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index e140703d5..c151831a3 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -156,7 +156,7 @@ class DownloadThread(BaseThread): self.log.error("pycurl error %s: %s" % (code, msg)) if self.core.debug: print_exc() - self.writeDebugReport(pyfile.pluginname, pyfile) + self.writeDebugReport(pyfile.plugin.__name__, pyfile) self.core.hookManager.downloadFailed(pyfile) @@ -186,7 +186,7 @@ class DownloadThread(BaseThread): if self.core.debug: print_exc() - self.writeDebugReport(pyfile.pluginname, pyfile) + self.writeDebugReport(pyfile.plugin.__name__, pyfile) self.core.hookManager.downloadFailed(pyfile) self.clean(pyfile) diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py index 5f21d487c..7db85803a 100644 --- a/module/threads/InfoThread.py +++ b/module/threads/InfoThread.py @@ -40,8 +40,8 @@ class InfoThread(BaseThread): #directly write to database if self.pid > -1: for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - klass = getattr(plugin, pluginname) + plugin = self.m.core.pluginManager.getPluginModule(pluginname) + klass = self.m.core.pluginManager.getPluginClass(pluginname) if has_method(klass, "getInfo"): self.fetchForPlugin(pluginname, klass, urls, self.updateDB) self.m.core.files.save() -- cgit v1.2.3 From c7ad1cc5b4a5d190a060e3ddd9274c3065da6708 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Fri, 13 Jan 2012 23:24:21 +0100 Subject: plugin unit test, closed #499, #500 --- module/threads/BaseThread.py | 1 + 1 file changed, 1 insertion(+) (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py index 526913e9b..f6fac46a0 100644 --- a/module/threads/BaseThread.py +++ b/module/threads/BaseThread.py @@ -68,6 +68,7 @@ class BaseThread(Thread): f.close() self.log.info("Debug Report written to %s" % dump_name) + return dump_name def getFileDump(self, pyfile): dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( -- cgit v1.2.3 From 21791b33f033c7a182e64ca6f878bf0ec1d8f88d Mon Sep 17 00:00:00 2001 From: RaNaN Date: Fri, 20 Jan 2012 00:01:15 +0100 Subject: parallel dl limit for hoster --- module/threads/ThreadManager.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) (limited to 'module/threads') diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index 612da2536..6a5096b8c 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -30,7 +30,7 @@ import pycurl from module.PyFile import PyFile from module.network.RequestFactory import getURL -from module.utils import lock +from module.utils import lock, uniqify from module.utils.fs import free_space from DecrypterThread import DecrypterThread @@ -264,14 +264,14 @@ class ThreadManager: free = [x for x in self.threads if not x.active] - inuse = set([(x.active.pluginname,self.getLimit(x)) for x in self.threads if x.active and x.active.hasPlugin() and x.active.plugin.account]) + inuse = uniqify([(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()]) inuse = map(lambda x : (x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) ,inuse) onlimit = [x[0] for x in inuse if 0 < x[1] <= x[2]] occ = [x.active.pluginname for x in self.threads if x.active and x.active.hasPlugin() and not x.active.plugin.multiDL] + onlimit occ.sort() - occ = tuple(set(occ)) + occ = tuple(uniqify(occ)) job = self.core.files.getJob(occ) if job: try: @@ -299,12 +299,6 @@ class ThreadManager: self.core.files.jobCache[occ] = [] self.core.files.jobCache[occ].append(job.id) - def getLimit(self, thread): - limit = thread.active.plugin.account.options.get("limitDL","0") - if limit == "": limit = "0" - return int(limit) - - def cleanup(self): """do global cleanup, should be called when finished with pycurl""" pycurl.global_cleanup() -- cgit v1.2.3 From 567c1d512e2e7452969d42d272d600d694e40058 Mon Sep 17 00:00:00 2001 From: zoidberg10 Date: Fri, 20 Jan 2012 01:58:41 +0100 Subject: fix threadmanager limitdl, add zevera.com --- module/threads/ThreadManager.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'module/threads') diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index 6a5096b8c..a42d6df16 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -266,12 +266,8 @@ class ThreadManager: inuse = uniqify([(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()]) inuse = map(lambda x : (x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) ,inuse) - onlimit = [x[0] for x in inuse if 0 < x[1] <= x[2]] - - occ = [x.active.pluginname for x in self.threads if x.active and x.active.hasPlugin() and not x.active.plugin.multiDL] + onlimit + occ = tuple(sorted([x[0] for x in inuse if 0 < x[1] <= x[2]])) - occ.sort() - occ = tuple(uniqify(occ)) job = self.core.files.getJob(occ) if job: try: -- cgit v1.2.3 From 0d77eac8e411b2467df343a6dd6e0fa932d80864 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Fri, 20 Jan 2012 09:52:57 +0100 Subject: little threadmanager fix --- module/threads/ThreadManager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'module/threads') diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index a42d6df16..f8b5c0aba 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -264,9 +264,9 @@ class ThreadManager: free = [x for x in self.threads if not x.active] - inuse = uniqify([(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()]) - inuse = map(lambda x : (x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) ,inuse) - occ = tuple(sorted([x[0] for x in inuse if 0 < x[1] <= x[2]])) + inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()] + inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse] + occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) job = self.core.files.getJob(occ) if job: -- cgit v1.2.3 From ebe0e6039d822e9c16a6095dba8691066bc3b466 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Mon, 13 Feb 2012 12:56:40 +0000 Subject: Catch internal server errors on the right place. --- module/threads/DownloadThread.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'module/threads') diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index c151831a3..879dbf8bd 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -26,6 +26,7 @@ from pycurl import error from module.plugins.Base import Fail, Retry from module.plugins.Hoster import Abort, Reconnect, SkipDownload +from module.network.HTTPRequest import BadHeader from BaseThread import BaseThread @@ -102,7 +103,12 @@ class DownloadThread(BaseThread): self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) self.queue.put(pyfile) continue - + except BadHeader, e: + if e.code == 500: + self.log.info(_("Internal Server Error")) + pyfile.error = _("Internal Server Error") + pyfile.plugin.tempOffline() + raise e except Fail, e: msg = e.args[0] @@ -212,4 +218,4 @@ class DownloadThread(BaseThread): def stop(self): """stops the thread""" - self.put("quit") \ No newline at end of file + self.put("quit") -- cgit v1.2.3 From 224683926624cf05d3441dae157de1a0ab68b973 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Mon, 13 Feb 2012 14:14:38 +0100 Subject: catch server errors correctly --- module/threads/DownloadThread.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'module/threads') diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 879dbf8bd..bd15b9b87 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -103,12 +103,6 @@ class DownloadThread(BaseThread): self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) self.queue.put(pyfile) continue - except BadHeader, e: - if e.code == 500: - self.log.info(_("Internal Server Error")) - pyfile.error = _("Internal Server Error") - pyfile.plugin.tempOffline() - raise e except Fail, e: msg = e.args[0] @@ -185,10 +179,16 @@ class DownloadThread(BaseThread): continue - except Exception, e: - pyfile.setStatus("failed") - self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) - pyfile.error = str(e) + except (Exception, BadHeader), e: + if isinstance(e, BadHeader) and e.code == 500: + pyfile.setStatus("temp. offline") + self.log.warning(_("Download is temporary offline: %s") % pyfile.name) + pyfile.error = _("Internal Server Error") + + else: + pyfile.setStatus("failed") + self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) + pyfile.error = str(e) if self.core.debug: print_exc() -- cgit v1.2.3 From 4df2b77fdf42046fe19bd371be7c7255986b5980 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Tue, 6 Mar 2012 13:36:39 +0100 Subject: renamed hooks to addons, new filemanager and database, many new api methods you will loose ALL your LINKS, webinterface will NOT work --- module/threads/AddonThread.py | 65 +++++++++++++++++++++++++++++++++++++++ module/threads/DecrypterThread.py | 3 +- module/threads/DownloadThread.py | 12 ++++---- module/threads/HookThread.py | 65 --------------------------------------- module/threads/InfoThread.py | 6 ++-- module/threads/ThreadManager.py | 6 ++-- 6 files changed, 79 insertions(+), 78 deletions(-) create mode 100644 module/threads/AddonThread.py delete mode 100644 module/threads/HookThread.py (limited to 'module/threads') diff --git a/module/threads/AddonThread.py b/module/threads/AddonThread.py new file mode 100644 index 000000000..3a378ad6e --- /dev/null +++ b/module/threads/AddonThread.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from copy import copy +from traceback import print_exc + +from BaseThread import BaseThread + +class AddonThread(BaseThread): + """thread for addons""" + + def __init__(self, m, function, args, kwargs): + """Constructor""" + BaseThread.__init__(self, m) + + self.f = function + self.args = args + self.kwargs = kwargs + + self.active = [] + + m.localThreads.append(self) + + self.start() + + def getActiveFiles(self): + return self.active + + def addActive(self, pyfile): + """ Adds a pyfile to active list and thus will be displayed on overview""" + if pyfile not in self.active: + self.active.append(pyfile) + + def finishFile(self, pyfile): + if pyfile in self.active: + self.active.remove(pyfile) + + pyfile.finishIfDone() + + def run(self): + try: + try: + self.kwargs["thread"] = self + self.f(*self.args, **self.kwargs) + except TypeError, e: + #dirty method to filter out exceptions + if "unexpected keyword argument 'thread'" not in e.args[0]: + raise + + del self.kwargs["thread"] + self.f(*self.args, **self.kwargs) + except Exception, e: + if hasattr(self.f, "im_self"): + addon = self.f.im_self + addon.logError(_("An Error occured"), e) + if self.m.core.debug: + print_exc() + self.writeDebugReport(addon.__name__, plugin=addon) + + finally: + local = copy(self.active) + for x in local: + self.finishFile(x) + + self.m.localThreads.remove(self) \ No newline at end of file diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py index ce3c8cd83..39448a620 100644 --- a/module/threads/DecrypterThread.py +++ b/module/threads/DecrypterThread.py @@ -55,6 +55,7 @@ class DecrypterThread(BaseThread): plugin.logDebug("Decrypted", plugin_result) result.extend(plugin_result) + #TODO result = uniqify(result) pack_names = {} urls = [] @@ -73,7 +74,7 @@ class DecrypterThread(BaseThread): self.m.core.api.addFiles(self.pid, urls) for p in pack_names.itervalues(): - self.m.core.api.addPackage(p.name, p.urls, p.dest, pack.password) + self.m.core.api.addPackage(p.name, p.urls, pack.password) if not result: self.log.info(_("No links decrypted")) diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index bd15b9b87..6239cddd8 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -64,11 +64,11 @@ class DownloadThread(BaseThread): self.log.info(_("Download starts: %s" % pyfile.name)) # start download - self.core.hookManager.downloadPreparing(pyfile) + self.core.addonManager.downloadPreparing(pyfile) pyfile.plugin.preprocessing(self) self.log.info(_("Download finished: %s") % pyfile.name) - self.core.hookManager.downloadFinished(pyfile) + self.core.addonManager.downloadFinished(pyfile) self.core.files.checkPackageFinished(pyfile) except NotImplementedError: @@ -117,7 +117,7 @@ class DownloadThread(BaseThread): self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) pyfile.error = msg - self.core.hookManager.downloadFailed(pyfile) + self.core.addonManager.downloadFailed(pyfile) self.clean(pyfile) continue @@ -158,7 +158,7 @@ class DownloadThread(BaseThread): print_exc() self.writeDebugReport(pyfile.plugin.__name__, pyfile) - self.core.hookManager.downloadFailed(pyfile) + self.core.addonManager.downloadFailed(pyfile) self.clean(pyfile) continue @@ -179,7 +179,7 @@ class DownloadThread(BaseThread): continue - except (Exception, BadHeader), e: + except Exception, e: if isinstance(e, BadHeader) and e.code == 500: pyfile.setStatus("temp. offline") self.log.warning(_("Download is temporary offline: %s") % pyfile.name) @@ -194,7 +194,7 @@ class DownloadThread(BaseThread): print_exc() self.writeDebugReport(pyfile.plugin.__name__, pyfile) - self.core.hookManager.downloadFailed(pyfile) + self.core.addonManager.downloadFailed(pyfile) self.clean(pyfile) continue diff --git a/module/threads/HookThread.py b/module/threads/HookThread.py deleted file mode 100644 index bffa72ca0..000000000 --- a/module/threads/HookThread.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from copy import copy -from traceback import print_exc - -from BaseThread import BaseThread - -class HookThread(BaseThread): - """thread for hooks""" - - def __init__(self, m, function, args, kwargs): - """Constructor""" - BaseThread.__init__(self, m) - - self.f = function - self.args = args - self.kwargs = kwargs - - self.active = [] - - m.localThreads.append(self) - - self.start() - - def getActiveFiles(self): - return self.active - - def addActive(self, pyfile): - """ Adds a pyfile to active list and thus will be displayed on overview""" - if pyfile not in self.active: - self.active.append(pyfile) - - def finishFile(self, pyfile): - if pyfile in self.active: - self.active.remove(pyfile) - - pyfile.finishIfDone() - - def run(self): - try: - try: - self.kwargs["thread"] = self - self.f(*self.args, **self.kwargs) - except TypeError, e: - #dirty method to filter out exceptions - if "unexpected keyword argument 'thread'" not in e.args[0]: - raise - - del self.kwargs["thread"] - self.f(*self.args, **self.kwargs) - except Exception, e: - if hasattr(self.f, "im_self"): - hook = self.f.im_self - hook.logError(_("An Error occured"), e) - if self.m.core.debug: - print_exc() - self.writeDebugReport(hook.__name__, plugin=hook) - - finally: - local = copy(self.active) - for x in local: - self.finishFile(x) - - self.m.localThreads.remove(self) \ No newline at end of file diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py index 7db85803a..a8a2c6e7e 100644 --- a/module/threads/InfoThread.py +++ b/module/threads/InfoThread.py @@ -4,7 +4,7 @@ from time import time from traceback import print_exc -from module.Api import OnlineStatus +from module.Api import LinkStatus from module.common.packagetools import parseNames from module.utils import has_method, accumulate @@ -100,7 +100,7 @@ class InfoThread(BaseThread): if len(self.cache) >= 20 or force: #used for package generating - tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size)))) + tmp = [(name, (url, LinkStatus(name, plugin, "unknown", status, int(size)))) for name, size, status, url in self.cache] data = parseNames(tmp) @@ -161,7 +161,7 @@ class InfoThread(BaseThread): # only decrypt files if has_method(klass, "decryptFile"): - urls = p.decrypt(urls) + urls = klass.decrypt(urls) data, crypter = self.m.core.pluginManager.parseUrls(urls) return data diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index f8b5c0aba..b3a1e8c6c 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -47,7 +47,7 @@ class ThreadManager: self.log = core.log self.threads = [] # thread list - self.localThreads = [] #hook+decrypter threads + self.localThreads = [] #addon+decrypter threads self.pause = True @@ -189,7 +189,7 @@ class ThreadManager: ip = self.getIP() - self.core.hookManager.beforeReconnecting(ip) + self.core.addonManager.beforeReconnecting(ip) self.log.debug("Old IP: %s" % ip) @@ -206,7 +206,7 @@ class ThreadManager: reconn.wait() sleep(1) ip = self.getIP() - self.core.hookManager.afterReconnecting(ip) + self.core.addonManager.afterReconnecting(ip) self.log.info(_("Reconnected, new IP: %s") % ip) -- cgit v1.2.3 From 50d4df8b4d48b855bd18e9922355b7f3f2b4da4e Mon Sep 17 00:00:00 2001 From: RaNaN Date: Tue, 20 Mar 2012 14:57:45 +0100 Subject: captcha decrypting for all plugin types, new interaction manager --- module/threads/DownloadThread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'module/threads') diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 6239cddd8..8166191af 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -24,8 +24,8 @@ from traceback import print_exc from sys import exc_clear from pycurl import error -from module.plugins.Base import Fail, Retry -from module.plugins.Hoster import Abort, Reconnect, SkipDownload +from module.plugins.Base import Fail, Retry, Abort +from module.plugins.Hoster import Reconnect, SkipDownload from module.network.HTTPRequest import BadHeader from BaseThread import BaseThread -- cgit v1.2.3 From b40b32ee05f611323a7827fad2a25fa0a28dcb24 Mon Sep 17 00:00:00 2001 From: X3n0m0rph59 Date: Sun, 22 Apr 2012 19:56:17 +0200 Subject: a huge pile of spelling fixes --- module/threads/BaseThread.py | 2 +- module/threads/DownloadThread.py | 4 ++-- module/threads/ThreadManager.py | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) (limited to 'module/threads') diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py index f6fac46a0..7a0ee5ee4 100644 --- a/module/threads/BaseThread.py +++ b/module/threads/BaseThread.py @@ -131,6 +131,6 @@ class BaseThread(Thread): return "" def clean(self, pyfile): - """ set thread unactive and release pyfile """ + """ set thread inactive and release pyfile """ self.active = False pyfile.release() diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 8166191af..7555a82ce 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -58,7 +58,7 @@ class DownloadThread(BaseThread): try: if not pyfile.hasPlugin(): continue - #this pyfile was deleted while queueing + #this pyfile was deleted while queuing pyfile.plugin.checkForSameFiles(starting=True) self.log.info(_("Download starts: %s" % pyfile.name)) @@ -212,7 +212,7 @@ class DownloadThread(BaseThread): def put(self, job): - """assing job to thread""" + """assign a job to the thread""" self.queue.put(job) diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index b3a1e8c6c..c3da13430 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -82,7 +82,7 @@ class ThreadManager: self.threads.append(thread) def createInfoThread(self, data, pid): - """ start a thread whichs fetches online status and other infos """ + """ start a thread which fetches online status and other info's """ self.timestamp = time() + 5 * 60 if data: InfoThread(self, data, pid) @@ -134,7 +134,7 @@ class ThreadManager: def work(self): - """run all task which have to be done (this is for repetivive call by core)""" + """run all task which have to be done (this is for repetetive call by core)""" try: self.tryReconnect() except Exception, e: @@ -231,7 +231,7 @@ class ThreadManager: return ip def checkThreadCount(self): - """checks if there are need for increasing or reducing thread count""" + """checks if there is a need for increasing or reducing thread count""" if len(self.threads) == self.core.config.get("download", "max_downloads"): return True @@ -244,7 +244,7 @@ class ThreadManager: def cleanPycurl(self): - """ make a global curl cleanup (currently ununused) """ + """ make a global curl cleanup (currently unused) """ if self.processingIds(): return False pycurl.global_cleanup() @@ -255,7 +255,7 @@ class ThreadManager: def assignJob(self): - """assing a job to a thread if possible""" + """assign a job to a thread if possible""" if self.pause or not self.core.api.isTimeDownload(): return -- cgit v1.2.3 From 2a74f88fcaf3d3baac24397de81a967c0b8c73e2 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Thu, 17 May 2012 20:06:11 +0200 Subject: small typo fixes and TODOs --- module/threads/AddonThread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'module/threads') diff --git a/module/threads/AddonThread.py b/module/threads/AddonThread.py index 3a378ad6e..b6a552e0e 100644 --- a/module/threads/AddonThread.py +++ b/module/threads/AddonThread.py @@ -37,7 +37,7 @@ class AddonThread(BaseThread): pyfile.finishIfDone() - def run(self): + def run(self): #TODO: approach via func_code try: try: self.kwargs["thread"] = self -- cgit v1.2.3 From 0d2d6daef850ac6bcc7fafccd230e52d2a862c2c Mon Sep 17 00:00:00 2001 From: RaNaN Date: Sun, 3 Jun 2012 17:45:10 +0200 Subject: updates for database + api --- module/threads/ThreadManager.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) (limited to 'module/threads') diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py index c3da13430..e3407aac3 100644 --- a/module/threads/ThreadManager.py +++ b/module/threads/ThreadManager.py @@ -1,22 +1,20 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -""" - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, see . - - @author: RaNaN -""" +############################################################################### +# Copyright(c) 2008-2012 pyLoad Team +# http://www.pyload.org +# +# This file is part of pyLoad. +# pyLoad is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Subjected to the terms and conditions in LICENSE +# +# @author: RaNaN +############################################################################### from os.path import exists, join import re @@ -28,7 +26,7 @@ from random import choice import pycurl -from module.PyFile import PyFile +from module.datatypes import PyFile from module.network.RequestFactory import getURL from module.utils import lock, uniqify from module.utils.fs import free_space -- cgit v1.2.3 From 1a55cb6a2eb8784253410b2e93510b5bcebf7f41 Mon Sep 17 00:00:00 2001 From: RaNaN Date: Mon, 10 Sep 2012 15:12:55 +0200 Subject: userApi for plugins --- module/threads/DownloadThread.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'module/threads') diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py index 7555a82ce..0269b0660 100644 --- a/module/threads/DownloadThread.py +++ b/module/threads/DownloadThread.py @@ -106,6 +106,8 @@ class DownloadThread(BaseThread): except Fail, e: msg = e.args[0] + # TODO: activate former skipped downloads + if msg == "offline": pyfile.setStatus("offline") self.log.warning(_("Download is offline: %s") % pyfile.name) -- cgit v1.2.3