diff options
Diffstat (limited to 'module/threads')
-rw-r--r-- | module/threads/AddonThread.py | 65 | ||||
-rw-r--r-- | module/threads/BaseThread.py | 142 | ||||
-rw-r--r-- | module/threads/DecrypterThread.py | 81 | ||||
-rw-r--r-- | module/threads/DownloadThread.py | 231 | ||||
-rw-r--r-- | module/threads/InfoThread.py | 168 | ||||
-rw-r--r-- | module/threads/ThreadManager.py | 313 | ||||
-rw-r--r-- | module/threads/__init__.py | 0 |
7 files changed, 0 insertions, 1000 deletions
diff --git a/module/threads/AddonThread.py b/module/threads/AddonThread.py deleted file mode 100644 index afb56f66b..000000000 --- a/module/threads/AddonThread.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from copy import copy -from traceback import print_exc - -from BaseThread import BaseThread - -class AddonThread(BaseThread): - """thread for addons""" - - def __init__(self, m, function, args, kwargs): - """Constructor""" - BaseThread.__init__(self, m) - - self.f = function - self.args = args - self.kwargs = kwargs - - self.active = [] - - m.localThreads.append(self) - - self.start() - - def getActiveFiles(self): - return self.active - - def addActive(self, pyfile): - """ Adds a pyfile to active list and thus will be displayed on overview""" - if pyfile not in self.active: - self.active.append(pyfile) - - def finishFile(self, pyfile): - if pyfile in self.active: - self.active.remove(pyfile) - - pyfile.finishIfDone() - - def run(self): #TODO: approach via func_code - try: - try: - self.kwargs["thread"] = self - self.f(*self.args, **self.kwargs) - except TypeError, e: - #dirty method to filter out exceptions - if "unexpected keyword argument 'thread'" not in e.args[0]: - raise - - del self.kwargs["thread"] - self.f(*self.args, **self.kwargs) - except Exception, e: - if hasattr(self.f, "im_self"): - addon = self.f.im_self - addon.logError(_("An Error occurred"), e) - if self.m.core.debug: - print_exc() - self.writeDebugReport(addon.__name__, plugin=addon) - - finally: - local = copy(self.active) - for x in local: - self.finishFile(x) - - self.m.localThreads.remove(self)
\ No newline at end of file diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py deleted file mode 100644 index c64678a72..000000000 --- a/module/threads/BaseThread.py +++ /dev/null @@ -1,142 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from threading import Thread -from time import strftime, gmtime -from sys import exc_info -from types import MethodType -from pprint import pformat -from traceback import format_exc - -from module.utils import primary_uid -from module.utils.fs import listdir, join, save_join, stat, exists - -class BaseThread(Thread): - """abstract base class for thread types""" - - def __init__(self, manager): - Thread.__init__(self) - self.setDaemon(True) - self.m = manager #thread manager - self.core = manager.core - self.log = manager.core.log - - #: Owner of the thread, every type should set it - self.owner = None - - @property - def user(self): - return primary_uid(self.owner) - - def getProgress(self): - """ retrieves progress information about the current running task - - :return: :class:`ProgressInfo` - """ - - # Debug Stuff - def writeDebugReport(self, name, pyfile=None, plugin=None): - """ writes a debug report to disk """ - - dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S")) - if pyfile: - dump = self.getFileDump(pyfile) - else: - dump = self.getPluginDump(plugin) - - try: - import zipfile - - zip = zipfile.ZipFile(dump_name, "w") - - if exists(join("tmp", name)): - for f in listdir(join("tmp", name)): - try: - # avoid encoding errors - zip.write(join("tmp", name, f), save_join(name, f)) - except: - pass - - info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime()) - info.external_attr = 0644 << 16L # change permissions - zip.writestr(info, dump) - - info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime()) - info.external_attr = 0644 << 16L - zip.writestr(info, self.getSystemDump()) - - zip.close() - - if not stat(dump_name).st_size: - raise Exception("Empty Zipfile") - - except Exception, e: - self.log.debug("Error creating zip file: %s" % e) - - dump_name = dump_name.replace(".zip", ".txt") - f = open(dump_name, "wb") - f.write(dump) - f.close() - - self.log.info("Debug Report written to %s" % dump_name) - return dump_name - - def getFileDump(self, pyfile): - dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % ( - self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc()) - - tb = exc_info()[2] - stack = [] - while tb: - stack.append(tb.tb_frame) - tb = tb.tb_next - - for frame in stack[1:]: - dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name, - frame.f_code.co_filename, - frame.f_lineno) - - for key, value in frame.f_locals.items(): - dump += "\t%20s = " % key - try: - dump += pformat(value) + "\n" - except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - - del frame - - del stack #delete it just to be sure... - - dump += "\n\nPLUGIN OBJECT DUMP: \n\n" - - for name in dir(pyfile.plugin): - attr = getattr(pyfile.plugin, name) - if not name.endswith("__") and type(attr) != MethodType: - dump += "\t%20s = " % name - try: - dump += pformat(attr) + "\n" - except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - - dump += "\nPYFILE OBJECT DUMP: \n\n" - - for name in dir(pyfile): - attr = getattr(pyfile, name) - if not name.endswith("__") and type(attr) != MethodType: - dump += "\t%20s = " % name - try: - dump += pformat(attr) + "\n" - except Exception, e: - dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n" - - dump += "\n\nCONFIG: \n\n" - dump += pformat(self.m.core.config.values) + "\n" - - return dump - - #TODO - def getPluginDump(self, plugin): - return "" - - def getSystemDump(self): - return "" diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py deleted file mode 100644 index 39448a620..000000000 --- a/module/threads/DecrypterThread.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from time import sleep -from traceback import print_exc - -from module.utils import uniqify -from module.plugins.Base import Retry -from module.plugins.Crypter import Package - -from BaseThread import BaseThread - -class DecrypterThread(BaseThread): - """thread for decrypting""" - - def __init__(self, manager, data, pid): - """constructor""" - BaseThread.__init__(self, manager) - self.data = data - self.pid = pid - - self.start() - - def run(self): - plugin_map = {} - for url, plugin in self.data: - if plugin in plugin_map: - plugin_map[plugin].append(url) - else: - plugin_map[plugin] = [url] - - self.decrypt(plugin_map) - - def decrypt(self, plugin_map): - pack = self.m.core.files.getPackage(self.pid) - result = [] - - for name, urls in plugin_map.iteritems(): - klass = self.m.core.pluginManager.loadClass("crypter", name) - plugin = klass(self.m.core, pack, pack.password) - plugin_result = [] - - try: - try: - plugin_result = plugin._decrypt(urls) - except Retry: - sleep(1) - plugin_result = plugin._decrypt(urls) - except Exception, e: - plugin.logError(_("Decrypting failed"), e) - if self.m.core.debug: - print_exc() - self.writeDebugReport(plugin.__name__, plugin=plugin) - - plugin.logDebug("Decrypted", plugin_result) - result.extend(plugin_result) - - #TODO - result = uniqify(result) - pack_names = {} - urls = [] - - for p in result: - if isinstance(p, Package): - if p.name in pack_names: - pack_names[p.name].urls.extend(p.urls) - else: - pack_names[p.name] = p - else: - urls.append(p) - - if urls: - self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name}) - self.m.core.api.addFiles(self.pid, urls) - - for p in pack_names.itervalues(): - self.m.core.api.addPackage(p.name, p.urls, pack.password) - - if not result: - self.log.info(_("No links decrypted")) - diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py deleted file mode 100644 index cf59c5639..000000000 --- a/module/threads/DownloadThread.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, see <http://www.gnu.org/licenses/>. - - @author: RaNaN -""" - -from Queue import Queue -from time import sleep, time -from traceback import print_exc -from sys import exc_clear -from pycurl import error - -from module.plugins.Base import Fail, Retry, Abort -from module.plugins.Hoster import Reconnect, SkipDownload -from module.network.HTTPRequest import BadHeader - -from BaseThread import BaseThread - -class DownloadThread(BaseThread): - """thread for downloading files from 'real' hoster plugins""" - - def __init__(self, manager): - """Constructor""" - BaseThread.__init__(self, manager) - - self.queue = Queue() # job queue - self.active = None - - self.start() - - def run(self): - """run method""" - pyfile = None - - while True: - del pyfile - self.active = self.queue.get() - pyfile = self.active - - if self.active == "quit": - self.active = None - self.m.threads.remove(self) - return True - - try: - if not pyfile.hasPlugin(): continue - #this pyfile was deleted while queuing - - pyfile.plugin.checkForSameFiles(starting=True) - self.log.info(_("Download starts: %s" % pyfile.name)) - - # start download - self.core.addonManager.downloadPreparing(pyfile) - pyfile.plugin.preprocessing(self) - - self.log.info(_("Download finished: %s") % pyfile.name) - self.core.addonManager.downloadFinished(pyfile) - self.core.files.checkPackageFinished(pyfile) - - except NotImplementedError: - self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname) - pyfile.setStatus("failed") - pyfile.error = "Plugin does not work" - self.clean(pyfile) - continue - - except Abort: - try: - self.log.info(_("Download aborted: %s") % pyfile.name) - except: - pass - - pyfile.setStatus("aborted") - - self.clean(pyfile) - continue - - except Reconnect: - self.queue.put(pyfile) - #pyfile.req.clearCookies() - - while self.m.reconnecting.isSet(): - sleep(0.5) - - continue - - except Retry, e: - reason = e.args[0] - self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason}) - self.queue.put(pyfile) - continue - except Fail, e: - msg = e.args[0] - - # TODO: activate former skipped downloads - - if msg == "offline": - pyfile.setStatus("offline") - self.log.warning(_("Download is offline: %s") % pyfile.name) - elif msg == "temp. offline": - pyfile.setStatus("temp. offline") - self.log.warning(_("Download is temporary offline: %s") % pyfile.name) - else: - pyfile.setStatus("failed") - self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg}) - pyfile.error = msg - - self.core.addonManager.downloadFailed(pyfile) - self.clean(pyfile) - continue - - except error, e: - if len(e.args) == 2: - code, msg = e.args - else: - code = 0 - msg = e.args - - self.log.debug("pycurl exception %s: %s" % (code, msg)) - - if code in (7, 18, 28, 52, 56): - self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry.")) - wait = time() + 60 - - pyfile.waitUntil = wait - pyfile.setStatus("waiting") - while time() < wait: - sleep(1) - if pyfile.abort: - break - - if pyfile.abort: - self.log.info(_("Download aborted: %s") % pyfile.name) - pyfile.setStatus("aborted") - - self.clean(pyfile) - else: - self.queue.put(pyfile) - - continue - - else: - pyfile.setStatus("failed") - self.log.error("pycurl error %s: %s" % (code, msg)) - if self.core.debug: - print_exc() - self.writeDebugReport(pyfile.plugin.__name__, pyfile) - - self.core.addonManager.downloadFailed(pyfile) - - self.clean(pyfile) - continue - - except SkipDownload, e: - pyfile.setStatus("skipped") - - self.log.info(_("Download skipped: %(name)s due to %(plugin)s") - % {"name": pyfile.name, "plugin": e.message}) - - self.clean(pyfile) - - self.core.files.checkPackageFinished(pyfile) - - self.active = False - self.core.files.save() - - continue - - - except Exception, e: - if isinstance(e, BadHeader) and e.code == 500: - pyfile.setStatus("temp. offline") - self.log.warning(_("Download is temporary offline: %s") % pyfile.name) - pyfile.error = _("Internal Server Error") - - else: - pyfile.setStatus("failed") - self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)}) - pyfile.error = str(e) - - if self.core.debug: - print_exc() - self.writeDebugReport(pyfile.plugin.__name__, pyfile) - - self.core.addonManager.downloadFailed(pyfile) - self.clean(pyfile) - continue - - finally: - self.core.files.save() - pyfile.checkIfProcessed() - exc_clear() - - - #pyfile.plugin.req.clean() - - self.active = False - pyfile.finishIfDone() - self.core.files.save() - - def getProgress(self): - if self.active: - return self.active.getProgressInfo() - - - def put(self, job): - """assign a job to the thread""" - self.queue.put(job) - - def clean(self, pyfile): - """ set thread inactive and release pyfile """ - self.active = False - pyfile.release() - - def stop(self): - """stops the thread""" - self.put("quit") diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py deleted file mode 100644 index bf5bb5777..000000000 --- a/module/threads/InfoThread.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -from time import time -from traceback import print_exc - -from module.Api import LinkStatus -from module.utils.packagetools import parseNames -from module.utils import has_method, accumulate - -from BaseThread import BaseThread - -class InfoThread(BaseThread): - def __init__(self, manager, data, pid=-1, rid=-1): - """Constructor""" - BaseThread.__init__(self, manager) - - self.data = data - self.pid = pid # package id - # [ .. (name, plugin) .. ] - - self.rid = rid #result id - - self.cache = [] #accumulated data - - self.start() - - def run(self): - """run method""" - - plugins = accumulate(self.data) - crypter = {} - - # filter out crypter plugins - for name in self.m.core.pluginManager.getPlugins("crypter"): - if name in plugins: - crypter[name] = plugins[name] - del plugins[name] - - #directly write to database - if self.pid > -1: - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPluginModule(pluginname) - klass = self.m.core.pluginManager.getPluginClass(pluginname) - if has_method(klass, "getInfo"): - self.fetchForPlugin(pluginname, klass, urls, self.updateDB) - self.m.core.files.save() - elif has_method(plugin, "getInfo"): - self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead") - self.fetchForPlugin(pluginname, plugin, urls, self.updateDB) - self.m.core.files.save() - - else: #post the results - for name, urls in crypter: - #attach container content - try: - data = self.decrypt(name, urls) - except: - print_exc() - self.m.log.error("Could not decrypt container.") - data = [] - - accumulate(data, plugins) - - self.m.infoResults[self.rid] = {} - - for pluginname, urls in plugins.iteritems(): - plugin = self.m.core.pluginManager.getPlugin(pluginname, True) - klass = getattr(plugin, pluginname) - if has_method(klass, "getInfo"): - self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) - #force to process cache - if self.cache: - self.updateResult(pluginname, [], True) - elif has_method(plugin, "getInfo"): - self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead") - self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True) - #force to process cache - if self.cache: - self.updateResult(pluginname, [], True) - else: - #generate default result - result = [(url, 0, 3, url) for url in urls] - - self.updateResult(pluginname, result, True) - - self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {} - - self.m.timestamp = time() + 5 * 60 - - - def updateDB(self, plugin, result): - self.m.core.files.updateFileInfo(result, self.pid) - - def updateResult(self, plugin, result, force=False): - #parse package name and generate result - #accumulate results - - self.cache.extend(result) - - if len(self.cache) >= 20 or force: - #used for package generating - tmp = [(name, (url, LinkStatus(name, plugin, "unknown", status, int(size)))) - for name, size, status, url in self.cache] - - data = parseNames(tmp) - result = {} - for k, v in data.iteritems(): - for url, status in v: - status.packagename = k - result[url] = status - - self.m.setInfoResults(self.rid, result) - - self.cache = [] - - def updateCache(self, plugin, result): - self.cache.extend(result) - - def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None): - try: - result = [] #result loaded from cache - process = [] #urls to process - for url in urls: - if url in self.m.infoCache: - result.append(self.m.infoCache[url]) - else: - process.append(url) - - if result: - self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname)) - cb(pluginname, result) - - if process: - self.m.log.debug("Run Info Fetching for %s" % pluginname) - for result in plugin.getInfo(process): - #result = [ .. (name, size, status, url) .. ] - if not type(result) == list: result = [result] - - for res in result: - self.m.infoCache[res[3]] = res - - cb(pluginname, result) - - self.m.log.debug("Finished Info Fetching for %s" % pluginname) - except Exception, e: - self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") % - {"name": pluginname, "err": str(e)}) - if self.m.core.debug: - print_exc() - - # generate default results - if err: - result = [(url, 0, 3, url) for url in urls] - cb(pluginname, result) - - - def decrypt(self, plugin, urls): - self.m.log.debug("Pre decrypting %s" % plugin) - klass = self.m.core.pluginManager.loadClass("crypter", plugin) - - # only decrypt files - if has_method(klass, "decryptFile"): - urls = klass.decrypt(urls) - data, crypter = self.m.core.pluginManager.parseUrls(urls) - return data - - return [] diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py deleted file mode 100644 index f67179d08..000000000 --- a/module/threads/ThreadManager.py +++ /dev/null @@ -1,313 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -############################################################################### -# Copyright(c) 2008-2012 pyLoad Team -# http://www.pyload.org -# -# This file is part of pyLoad. -# pyLoad is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# Subjected to the terms and conditions in LICENSE -# -# @author: RaNaN -############################################################################### - -from os.path import exists, join -import re -from subprocess import Popen -from threading import Event, Lock -from time import sleep, time -from traceback import print_exc -from random import choice - -import pycurl - -from module.datatypes.PyFile import PyFile -from module.network.RequestFactory import getURL -from module.utils import lock, uniqify -from module.utils.fs import free_space - -from DecrypterThread import DecrypterThread -from DownloadThread import DownloadThread -from InfoThread import InfoThread - -class ThreadManager: - """manages the download threads, assign jobs, reconnect etc""" - - - def __init__(self, core): - """Constructor""" - self.core = core - self.log = core.log - - self.threads = [] # thread list - self.localThreads = [] #addon+decrypter threads - - self.pause = True - - self.reconnecting = Event() - self.reconnecting.clear() - self.downloaded = 0 #number of files downloaded since last cleanup - - self.lock = Lock() - - # some operations require to fetch url info from hoster, so we caching them so it wont be done twice - # contains a timestamp and will be purged after timeout - self.infoCache = {} - - # pool of ids for online check - self.resultIDs = 0 - - # threads which are fetching hoster results - self.infoResults = {} - # timeout for cache purge - self.timestamp = 0 - - pycurl.global_init(pycurl.GLOBAL_DEFAULT) - - for i in range(self.core.config.get("download", "max_downloads")): - self.createThread() - - - def createThread(self): - """create a download thread""" - - thread = DownloadThread(self) - self.threads.append(thread) - - def createInfoThread(self, data, pid): - """ start a thread which fetches online status and other info's """ - self.timestamp = time() + 5 * 60 - if data: InfoThread(self, data, pid) - - @lock - def createResultThread(self, data): - """ creates a thread to fetch online status, returns result id """ - self.timestamp = time() + 5 * 60 - - rid = self.resultIDs - self.resultIDs += 1 - - InfoThread(self, data, rid=rid) - - return rid - - @lock - def createDecryptThread(self, data, pid): - """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" - if data: DecrypterThread(self, data, pid) - - - @lock - def getInfoResult(self, rid): - """returns result and clears it""" - self.timestamp = time() + 5 * 60 - - if rid in self.infoResults: - data = self.infoResults[rid] - self.infoResults[rid] = {} - return data - else: - return {} - - @lock - def setInfoResults(self, rid, result): - self.infoResults[rid].update(result) - - def getActiveDownloads(self, user=None): - # TODO: user context - return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] - - def getProgressList(self, user=None): - info = [] - - # TODO: local threads can create multiple progresses - for thread in self.threads + self.localThreads: - # skip if not belong to current user - if user and thread.user != user: continue - - progress = thread.getProgress() - if progress: info.append(progress) - - return info - - def getActiveFiles(self): - active = self.getActiveDownloads() - - for t in self.localThreads: - active.extend(t.getActiveFiles()) - - return active - - def processingIds(self): - """get a id list of all pyfiles processed""" - return [x.id for x in self.getActiveFiles()] - - def work(self): - """run all task which have to be done (this is for repetetive call by core)""" - try: - self.tryReconnect() - except Exception, e: - self.log.error(_("Reconnect Failed: %s") % str(e) ) - self.reconnecting.clear() - self.core.print_exc() - - self.checkThreadCount() - - try: - self.assignJob() - except Exception, e: - self.log.warning("Assign job error", e) - self.core.print_exc() - - sleep(0.5) - self.assignJob() - #it may be failed non critical so we try it again - - if (self.infoCache or self.infoResults) and self.timestamp < time(): - self.infoCache.clear() - self.infoResults.clear() - self.log.debug("Cleared Result cache") - - def tryReconnect(self): - """checks if reconnect needed""" - - if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): - return False - - active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - - if not (0 < active.count(True) == len(active)): - return False - - if not exists(self.core.config['reconnect']['method']): - if exists(join(pypath, self.core.config['reconnect']['method'])): - self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) - else: - self.core.config["reconnect"]["activated"] = False - self.log.warning(_("Reconnect script not found!")) - return - - self.reconnecting.set() - - #Do reconnect - self.log.info(_("Starting reconnect")) - - while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: - sleep(0.25) - - ip = self.getIP() - - self.core.addonManager.beforeReconnecting(ip) - - self.log.debug("Old IP: %s" % ip) - - try: - reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) - except: - self.log.warning(_("Failed executing reconnect script!")) - self.core.config["reconnect"]["activated"] = False - self.reconnecting.clear() - if self.core.debug: - print_exc() - return - - reconn.wait() - sleep(1) - ip = self.getIP() - self.core.addonManager.afterReconnecting(ip) - - self.log.info(_("Reconnected, new IP: %s") % ip) - - self.reconnecting.clear() - - def getIP(self): - """retrieve current ip""" - services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), - ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")] - - ip = "" - for i in range(10): - try: - sv = choice(services) - ip = getURL(sv[0]) - ip = re.match(sv[1], ip).group(1) - break - except: - ip = "" - sleep(1) - - return ip - - def checkThreadCount(self): - """checks if there is a need for increasing or reducing thread count""" - - if len(self.threads) == self.core.config.get("download", "max_downloads"): - return True - elif len(self.threads) < self.core.config.get("download", "max_downloads"): - self.createThread() - else: - free = [x for x in self.threads if not x.active] - if free: - free[0].put("quit") - - - def cleanPycurl(self): - """ make a global curl cleanup (currently unused) """ - if self.processingIds(): - return False - pycurl.global_cleanup() - pycurl.global_init(pycurl.GLOBAL_DEFAULT) - self.downloaded = 0 - self.log.debug("Cleaned up pycurl") - return True - - - def assignJob(self): - """assign a job to a thread if possible""" - - if self.pause or not self.core.api.isTimeDownload(): return - - #if self.downloaded > 20: - # if not self.cleanPyCurl(): return - - free = [x for x in self.threads if not x.active] - - inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()] - inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse] - occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) - - job = self.core.files.getJob(occ) - if job: - try: - job.initPlugin() - except Exception, e: - self.log.critical(str(e)) - print_exc() - job.setStatus("failed") - job.error = str(e) - job.release() - return - - spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 - if spaceLeft < self.core.config["general"]["min_free_space"]: - self.log.warning(_("Not enough space left on device")) - self.pause = True - - if free and not self.pause: - thread = free[0] - #self.downloaded += 1 - thread.put(job) - else: - #put job back - if occ not in self.core.files.jobCache: - self.core.files.jobCache[occ] = [] - self.core.files.jobCache[occ].append(job.id) - - def cleanup(self): - """do global cleanup, should be called when finished with pycurl""" - pycurl.global_cleanup() diff --git a/module/threads/__init__.py b/module/threads/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/module/threads/__init__.py +++ /dev/null |