diff options
Diffstat (limited to 'module/threads')
-rw-r--r-- | module/threads/BaseThread.py | 136 | ||||
-rw-r--r-- | module/threads/DecrypterThread.py | 80 | ||||
-rw-r--r-- | module/threads/DownloadThread.py | 215 | ||||
-rw-r--r-- | module/threads/HookThread.py | 65 | ||||
-rw-r--r-- | module/threads/InfoThread.py | 168 | ||||
-rw-r--r-- | module/threads/ThreadManager.py | 300 | ||||
-rw-r--r-- | module/threads/__init__.py | 0 |
7 files changed, 964 insertions, 0 deletions
diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py new file mode 100644 index 000000000..f6fac46a0 --- /dev/null +++ b/module/threads/BaseThread.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +import locale + +from threading import Thread +from time import strftime, gmtime +from sys import exc_info +from types import MethodType +from pprint import pformat +from traceback import format_exc + +from module.utils.fs import listdir, join, save_join, stat, exists + +class BaseThread(Thread): + """abstract base class for thread types""" + + def __init__(self, manager): + Thread.__init__(self) + self.setDaemon(True) + self.m = manager #thread manager + self.core = manager.core + self.log = manager.core.log + + def writeDebugReport(self, name, pyfile=None, plugin=None): + """ writes a debug report to disk """ + + dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S")) + if pyfile: + dump = self.getFileDump(pyfile) + else: + dump = self.getPluginDump(plugin) + + try: + import zipfile + + zip = zipfile.ZipFile(dump_name, "w") + + if exists(join("tmp", name)): + for f in listdir(join("tmp", name)): + try: + # avoid encoding errors + zip.write(join("tmp", name, f), save_join(name, f)) + except: + pass + + info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime()) + info.external_attr = 0644 << 16L # change permissions + zip.writestr(info, dump) + + info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime()) + info.external_attr = 0644 << 16L + zip.writestr(info, self.getSystemDump()) + + zip.close() + + if not stat(dump_name).st_size: + raise Exception("Empty Zipfile") + + except Exception, e: + self.log.debug("Error creating zip file: %s" % e) + + dump_name = dump_name.replace(".zip", ".txt") + f = open(dump_name, "wb") + f.write(dump) + f.close() + + self.log.info("Debug Report written to %s" % dump_name) + return dump_name + + def getFileDump(self, pyfile): + dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( + self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) + + tb = exc_info()[2] + stack = [] + while tb: + stack.append(tb.tb_frame) + tb = tb.tb_next + + for frame in stack[1:]: + dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, + frame.f_code.co_filename, + frame.f_lineno) + + for key, value in frame.f_locals.items(): + dump += "\t%20s = " % key + try: + dump += pformat(value) + "\n" + except Exception, e: + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + + del frame + + del stack #delete it just to be sure... + + dump += "\n\nPLUGIN OBJECT DUMP: \n\n" + + for name in dir(pyfile.plugin): + attr = getattr(pyfile.plugin, name) + if not name.endswith("__") and type(attr) != MethodType: + dump += "\t%20s = " % name + try: + dump += pformat(attr) + "\n" + except Exception, e: + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + + dump += "\nPYFILE OBJECT DUMP: \n\n" + + for name in dir(pyfile): + attr = getattr(pyfile, name) + if not name.endswith("__") and type(attr) != MethodType: + dump += "\t%20s = " % name + try: + dump += pformat(attr) + "\n" + except Exception, e: + dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" + + dump += "\n\nCONFIG: \n\n" + dump += pformat(self.m.core.config.values) + "\n" + + return dump + + #TODO + def getPluginDump(self, plugin): + return "" + + def getSystemDump(self): + return "" + + def clean(self, pyfile): + """ set thread unactive and release pyfile """ + self.active = False + pyfile.release() diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py new file mode 100644 index 000000000..ce3c8cd83 --- /dev/null +++ b/module/threads/DecrypterThread.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from time import sleep +from traceback import print_exc + +from module.utils import uniqify +from module.plugins.Base import Retry +from module.plugins.Crypter import Package + +from BaseThread import BaseThread + +class DecrypterThread(BaseThread): + """thread for decrypting""" + + def __init__(self, manager, data, pid): + """constructor""" + BaseThread.__init__(self, manager) + self.data = data + self.pid = pid + + self.start() + + def run(self): + plugin_map = {} + for url, plugin in self.data: + if plugin in plugin_map: + plugin_map[plugin].append(url) + else: + plugin_map[plugin] = [url] + + self.decrypt(plugin_map) + + def decrypt(self, plugin_map): + pack = self.m.core.files.getPackage(self.pid) + result = [] + + for name, urls in plugin_map.iteritems(): + klass = self.m.core.pluginManager.loadClass("crypter", name) + plugin = klass(self.m.core, pack, pack.password) + plugin_result = [] + + try: + try: + plugin_result = plugin._decrypt(urls) + except Retry: + sleep(1) + plugin_result = plugin._decrypt(urls) + except Exception, e: + plugin.logError(_("Decrypting failed"), e) + if self.m.core.debug: + print_exc() + self.writeDebugReport(plugin.__name__, plugin=plugin) + + plugin.logDebug("Decrypted", plugin_result) + result.extend(plugin_result) + + result = uniqify(result) + pack_names = {} + urls = [] + + for p in result: + if isinstance(p, Package): + if p.name in pack_names: + pack_names[p.name].urls.extend(p.urls) + else: + pack_names[p.name] = p + else: + urls.append(p) + + if urls: + self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) + self.m.core.api.addFiles(self.pid, urls) + + for p in pack_names.itervalues(): + self.m.core.api.addPackage(p.name, p.urls, p.dest, pack.password) + + if not result: + self.log.info(_("No links decrypted")) + diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py new file mode 100644 index 000000000..c151831a3 --- /dev/null +++ b/module/threads/DownloadThread.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. + + @author: RaNaN +""" + +from Queue import Queue +from time import sleep, time +from traceback import print_exc +from sys import exc_clear +from pycurl import error + +from module.plugins.Base import Fail, Retry +from module.plugins.Hoster import Abort, Reconnect, SkipDownload + +from BaseThread import BaseThread + +class DownloadThread(BaseThread): + """thread for downloading files from 'real' hoster plugins""" + + def __init__(self, manager): + """Constructor""" + BaseThread.__init__(self, manager) + + self.queue = Queue() # job queue + self.active = False + + self.start() + + def run(self): + """run method""" + pyfile = None + + while True: + del pyfile + self.active = self.queue.get() + pyfile = self.active + + if self.active == "quit": + self.active = False + self.m.threads.remove(self) + return True + + try: + if not pyfile.hasPlugin(): continue + #this pyfile was deleted while queueing + + pyfile.plugin.checkForSameFiles(starting=True) + self.log.info(_("Download starts: %s" % pyfile.name)) + + # start download + self.core.hookManager.downloadPreparing(pyfile) + pyfile.plugin.preprocessing(self) + + self.log.info(_("Download finished: %s") % pyfile.name) + self.core.hookManager.downloadFinished(pyfile) + self.core.files.checkPackageFinished(pyfile) + + except NotImplementedError: + self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) + pyfile.setStatus("failed") + pyfile.error = "Plugin does not work" + self.clean(pyfile) + continue + + except Abort: + try: + self.log.info(_("Download aborted: %s") % pyfile.name) + except: + pass + + pyfile.setStatus("aborted") + + self.clean(pyfile) + continue + + except Reconnect: + self.queue.put(pyfile) + #pyfile.req.clearCookies() + + while self.m.reconnecting.isSet(): + sleep(0.5) + + continue + + except Retry, e: + reason = e.args[0] + self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) + self.queue.put(pyfile) + continue + + except Fail, e: + msg = e.args[0] + + if msg == "offline": + pyfile.setStatus("offline") + self.log.warning(_("Download is offline: %s") % pyfile.name) + elif msg == "temp. offline": + pyfile.setStatus("temp. offline") + self.log.warning(_("Download is temporary offline: %s") % pyfile.name) + else: + pyfile.setStatus("failed") + self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) + pyfile.error = msg + + self.core.hookManager.downloadFailed(pyfile) + self.clean(pyfile) + continue + + except error, e: + if len(e.args) == 2: + code, msg = e.args + else: + code = 0 + msg = e.args + + self.log.debug("pycurl exception %s: %s" % (code, msg)) + + if code in (7, 18, 28, 52, 56): + self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) + wait = time() + 60 + + pyfile.waitUntil = wait + pyfile.setStatus("waiting") + while time() < wait: + sleep(1) + if pyfile.abort: + break + + if pyfile.abort: + self.log.info(_("Download aborted: %s") % pyfile.name) + pyfile.setStatus("aborted") + + self.clean(pyfile) + else: + self.queue.put(pyfile) + + continue + + else: + pyfile.setStatus("failed") + self.log.error("pycurl error %s: %s" % (code, msg)) + if self.core.debug: + print_exc() + self.writeDebugReport(pyfile.plugin.__name__, pyfile) + + self.core.hookManager.downloadFailed(pyfile) + + self.clean(pyfile) + continue + + except SkipDownload, e: + pyfile.setStatus("skipped") + + self.log.info(_("Download skipped: %(name)s due to %(plugin)s") + % {"name": pyfile.name, "plugin": e.message}) + + self.clean(pyfile) + + self.core.files.checkPackageFinished(pyfile) + + self.active = False + self.core.files.save() + + continue + + + except Exception, e: + pyfile.setStatus("failed") + self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) + pyfile.error = str(e) + + if self.core.debug: + print_exc() + self.writeDebugReport(pyfile.plugin.__name__, pyfile) + + self.core.hookManager.downloadFailed(pyfile) + self.clean(pyfile) + continue + + finally: + self.core.files.save() + pyfile.checkIfProcessed() + exc_clear() + + + #pyfile.plugin.req.clean() + + self.active = False + pyfile.finishIfDone() + self.core.files.save() + + + def put(self, job): + """assing job to thread""" + self.queue.put(job) + + + def stop(self): + """stops the thread""" + self.put("quit")
\ No newline at end of file diff --git a/module/threads/HookThread.py b/module/threads/HookThread.py new file mode 100644 index 000000000..bffa72ca0 --- /dev/null +++ b/module/threads/HookThread.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from copy import copy +from traceback import print_exc + +from BaseThread import BaseThread + +class HookThread(BaseThread): + """thread for hooks""" + + def __init__(self, m, function, args, kwargs): + """Constructor""" + BaseThread.__init__(self, m) + + self.f = function + self.args = args + self.kwargs = kwargs + + self.active = [] + + m.localThreads.append(self) + + self.start() + + def getActiveFiles(self): + return self.active + + def addActive(self, pyfile): + """ Adds a pyfile to active list and thus will be displayed on overview""" + if pyfile not in self.active: + self.active.append(pyfile) + + def finishFile(self, pyfile): + if pyfile in self.active: + self.active.remove(pyfile) + + pyfile.finishIfDone() + + def run(self): + try: + try: + self.kwargs["thread"] = self + self.f(*self.args, **self.kwargs) + except TypeError, e: + #dirty method to filter out exceptions + if "unexpected keyword argument 'thread'" not in e.args[0]: + raise + + del self.kwargs["thread"] + self.f(*self.args, **self.kwargs) + except Exception, e: + if hasattr(self.f, "im_self"): + hook = self.f.im_self + hook.logError(_("An Error occured"), e) + if self.m.core.debug: + print_exc() + self.writeDebugReport(hook.__name__, plugin=hook) + + finally: + local = copy(self.active) + for x in local: + self.finishFile(x) + + self.m.localThreads.remove(self)
\ No newline at end of file diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py new file mode 100644 index 000000000..7db85803a --- /dev/null +++ b/module/threads/InfoThread.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from time import time +from traceback import print_exc + +from module.Api import OnlineStatus +from module.common.packagetools import parseNames +from module.utils import has_method, accumulate + +from BaseThread import BaseThread + +class InfoThread(BaseThread): + def __init__(self, manager, data, pid=-1, rid=-1): + """Constructor""" + BaseThread.__init__(self, manager) + + self.data = data + self.pid = pid # package id + # [ .. (name, plugin) .. ] + + self.rid = rid #result id + + self.cache = [] #accumulated data + + self.start() + + def run(self): + """run method""" + + plugins = accumulate(self.data) + crypter = {} + + # filter out crypter plugins + for name in self.m.core.pluginManager.getPlugins("crypter"): + if name in plugins: + crypter[name] = plugins[name] + del plugins[name] + + #directly write to database + if self.pid > -1: + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPluginModule(pluginname) + klass = self.m.core.pluginManager.getPluginClass(pluginname) + if has_method(klass, "getInfo"): + self.fetchForPlugin(pluginname, klass, urls, self.updateDB) + self.m.core.files.save() + elif has_method(plugin, "getInfo"): + self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead") + self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) + self.m.core.files.save() + + else: #post the results + for name, urls in crypter: + #attach container content + try: + data = self.decrypt(name, urls) + except: + print_exc() + self.m.log.error("Could not decrypt container.") + data = [] + + accumulate(data, plugins) + + self.m.infoResults[self.rid] = {} + + for pluginname, urls in plugins.iteritems(): + plugin = self.m.core.pluginManager.getPlugin(pluginname, True) + klass = getattr(plugin, pluginname) + if has_method(klass, "getInfo"): + self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + #force to process cache + if self.cache: + self.updateResult(pluginname, [], True) + elif has_method(plugin, "getInfo"): + self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") + self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) + #force to process cache + if self.cache: + self.updateResult(pluginname, [], True) + else: + #generate default result + result = [(url, 0, 3, url) for url in urls] + + self.updateResult(pluginname, result, True) + + self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} + + self.m.timestamp = time() + 5 * 60 + + + def updateDB(self, plugin, result): + self.m.core.files.updateFileInfo(result, self.pid) + + def updateResult(self, plugin, result, force=False): + #parse package name and generate result + #accumulate results + + self.cache.extend(result) + + if len(self.cache) >= 20 or force: + #used for package generating + tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size)))) + for name, size, status, url in self.cache] + + data = parseNames(tmp) + result = {} + for k, v in data.iteritems(): + for url, status in v: + status.packagename = k + result[url] = status + + self.m.setInfoResults(self.rid, result) + + self.cache = [] + + def updateCache(self, plugin, result): + self.cache.extend(result) + + def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): + try: + result = [] #result loaded from cache + process = [] #urls to process + for url in urls: + if url in self.m.infoCache: + result.append(self.m.infoCache[url]) + else: + process.append(url) + + if result: + self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) + cb(pluginname, result) + + if process: + self.m.log.debug("Run Info Fetching for %s" % pluginname) + for result in plugin.getInfo(process): + #result = [ .. (name, size, status, url) .. ] + if not type(result) == list: result = [result] + + for res in result: + self.m.infoCache[res[3]] = res + + cb(pluginname, result) + + self.m.log.debug("Finished Info Fetching for %s" % pluginname) + except Exception, e: + self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % + {"name": pluginname, "err": str(e)}) + if self.m.core.debug: + print_exc() + + # generate default results + if err: + result = [(url, 0, 3, url) for url in urls] + cb(pluginname, result) + + + def decrypt(self, plugin, urls): + self.m.log.debug("Pre decrypting %s" % plugin) + klass = self.m.core.pluginManager.loadClass("crypter", plugin) + + # only decrypt files + if has_method(klass, "decryptFile"): + urls = p.decrypt(urls) + data, crypter = self.m.core.pluginManager.parseUrls(urls) + return data + + return [] diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py new file mode 100644 index 000000000..f8b5c0aba --- /dev/null +++ b/module/threads/ThreadManager.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. + + @author: RaNaN +""" + +from os.path import exists, join +import re +from subprocess import Popen +from threading import Event, Lock +from time import sleep, time +from traceback import print_exc +from random import choice + +import pycurl + +from module.PyFile import PyFile +from module.network.RequestFactory import getURL +from module.utils import lock, uniqify +from module.utils.fs import free_space + +from DecrypterThread import DecrypterThread +from DownloadThread import DownloadThread +from InfoThread import InfoThread + +class ThreadManager: + """manages the download threads, assign jobs, reconnect etc""" + + + def __init__(self, core): + """Constructor""" + self.core = core + self.log = core.log + + self.threads = [] # thread list + self.localThreads = [] #hook+decrypter threads + + self.pause = True + + self.reconnecting = Event() + self.reconnecting.clear() + self.downloaded = 0 #number of files downloaded since last cleanup + + self.lock = Lock() + + # some operations require to fetch url info from hoster, so we caching them so it wont be done twice + # contains a timestamp and will be purged after timeout + self.infoCache = {} + + # pool of ids for online check + self.resultIDs = 0 + + # threads which are fetching hoster results + self.infoResults = {} + # timeout for cache purge + self.timestamp = 0 + + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + + for i in range(self.core.config.get("download", "max_downloads")): + self.createThread() + + + def createThread(self): + """create a download thread""" + + thread = DownloadThread(self) + self.threads.append(thread) + + def createInfoThread(self, data, pid): + """ start a thread whichs fetches online status and other infos """ + self.timestamp = time() + 5 * 60 + if data: InfoThread(self, data, pid) + + @lock + def createResultThread(self, data): + """ creates a thread to fetch online status, returns result id """ + self.timestamp = time() + 5 * 60 + + rid = self.resultIDs + self.resultIDs += 1 + + InfoThread(self, data, rid=rid) + + return rid + + @lock + def createDecryptThread(self, data, pid): + """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" + if data: DecrypterThread(self, data, pid) + + + @lock + def getInfoResult(self, rid): + """returns result and clears it""" + self.timestamp = time() + 5 * 60 + + if rid in self.infoResults: + data = self.infoResults[rid] + self.infoResults[rid] = {} + return data + else: + return {} + + @lock + def setInfoResults(self, rid, result): + self.infoResults[rid].update(result) + + def getActiveFiles(self): + active = [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] + + for t in self.localThreads: + active.extend(t.getActiveFiles()) + + return active + + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.id for x in self.getActiveFiles()] + + + def work(self): + """run all task which have to be done (this is for repetivive call by core)""" + try: + self.tryReconnect() + except Exception, e: + self.log.error(_("Reconnect Failed: %s") % str(e) ) + self.reconnecting.clear() + if self.core.debug: + print_exc() + self.checkThreadCount() + + try: + self.assignJob() + except Exception, e: + self.log.warning("Assign job error", e) + if self.core.debug: + print_exc() + + sleep(0.5) + self.assignJob() + #it may be failed non critical so we try it again + + if (self.infoCache or self.infoResults) and self.timestamp < time(): + self.infoCache.clear() + self.infoResults.clear() + self.log.debug("Cleared Result cache") + + def tryReconnect(self): + """checks if reconnect needed""" + + if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): + return False + + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] + + if not (0 < active.count(True) == len(active)): + return False + + if not exists(self.core.config['reconnect']['method']): + if exists(join(pypath, self.core.config['reconnect']['method'])): + self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) + else: + self.core.config["reconnect"]["activated"] = False + self.log.warning(_("Reconnect script not found!")) + return + + self.reconnecting.set() + + #Do reconnect + self.log.info(_("Starting reconnect")) + + while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: + sleep(0.25) + + ip = self.getIP() + + self.core.hookManager.beforeReconnecting(ip) + + self.log.debug("Old IP: %s" % ip) + + try: + reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) + except: + self.log.warning(_("Failed executing reconnect script!")) + self.core.config["reconnect"]["activated"] = False + self.reconnecting.clear() + if self.core.debug: + print_exc() + return + + reconn.wait() + sleep(1) + ip = self.getIP() + self.core.hookManager.afterReconnecting(ip) + + self.log.info(_("Reconnected, new IP: %s") % ip) + + self.reconnecting.clear() + + def getIP(self): + """retrieve current ip""" + services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), + ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")] + + ip = "" + for i in range(10): + try: + sv = choice(services) + ip = getURL(sv[0]) + ip = re.match(sv[1], ip).group(1) + break + except: + ip = "" + sleep(1) + + return ip + + def checkThreadCount(self): + """checks if there are need for increasing or reducing thread count""" + + if len(self.threads) == self.core.config.get("download", "max_downloads"): + return True + elif len(self.threads) < self.core.config.get("download", "max_downloads"): + self.createThread() + else: + free = [x for x in self.threads if not x.active] + if free: + free[0].put("quit") + + + def cleanPycurl(self): + """ make a global curl cleanup (currently ununused) """ + if self.processingIds(): + return False + pycurl.global_cleanup() + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + self.downloaded = 0 + self.log.debug("Cleaned up pycurl") + return True + + + def assignJob(self): + """assing a job to a thread if possible""" + + if self.pause or not self.core.api.isTimeDownload(): return + + #if self.downloaded > 20: + # if not self.cleanPyCurl(): return + + free = [x for x in self.threads if not x.active] + + inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()] + inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse] + occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) + + job = self.core.files.getJob(occ) + if job: + try: + job.initPlugin() + except Exception, e: + self.log.critical(str(e)) + print_exc() + job.setStatus("failed") + job.error = str(e) + job.release() + return + + spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 + if spaceLeft < self.core.config["general"]["min_free_space"]: + self.log.warning(_("Not enough space left on device")) + self.pause = True + + if free and not self.pause: + thread = free[0] + #self.downloaded += 1 + thread.put(job) + else: + #put job back + if occ not in self.core.files.jobCache: + self.core.files.jobCache[occ] = [] + self.core.files.jobCache[occ].append(job.id) + + def cleanup(self): + """do global cleanup, should be called when finished with pycurl""" + pycurl.global_cleanup() diff --git a/module/threads/__init__.py b/module/threads/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/module/threads/__init__.py |