diff options
Diffstat (limited to 'pyload/threads/ThreadManager.py')
-rw-r--r-- | pyload/threads/ThreadManager.py | 220 |
1 files changed, 8 insertions, 212 deletions
diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py index 298b0402d..f6cb3daea 100644 --- a/pyload/threads/ThreadManager.py +++ b/pyload/threads/ThreadManager.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- ############################################################################### -# Copyright(c) 2008-2013 pyLoad Team +# Copyright(c) 2008-2014 pyLoad Team # http://www.pyload.org # # This file is part of pyLoad. @@ -16,22 +16,11 @@ # @author: RaNaN ############################################################################### -from os.path import exists, join -import re -from subprocess import Popen -from threading import Event, RLock -from time import sleep, time -from traceback import print_exc -from random import choice +from threading import RLock +from time import time -from pyload.datatypes.PyFile import PyFile from pyload.datatypes.OnlineCheck import OnlineCheck -from pyload.network.RequestFactory import getURL -from pyload.utils import lock, uniqify, to_list -from pyload.utils.fs import free_space - -from DecrypterThread import DecrypterThread -from DownloadThread import DownloadThread +from pyload.utils import lock, to_list from InfoThread import InfoThread @@ -44,13 +33,6 @@ class ThreadManager: self.log = core.log self.threads = [] # thread list - self.localThreads = [] #addon+decrypter threads - - self.pause = True - - self.reconnecting = Event() - self.reconnecting.clear() - self.downloaded = 0 #number of files downloaded since last cleanup self.lock = RLock() @@ -67,24 +49,15 @@ class ThreadManager: # timeout for cache purge self.timestamp = 0 - for i in range(self.core.config.get("download", "max_downloads")): - self.createThread() - - def createThread(self): - """create a download thread""" - - thread = DownloadThread(self) - self.threads.append(thread) - @lock def addThread(self, thread): - self.localThreads.append(thread) + self.threads.append(thread) @lock def removeThread(self, thread): """ Remove a thread from the local list """ - if thread in self.localThreads: - self.localThreads.remove(thread) + if thread in self.threads: + self.threads.remove(thread) @lock def createInfoThread(self, data, pid): @@ -108,11 +81,6 @@ class ThreadManager: return rid @lock - def createDecryptThread(self, data, pid): - """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" - if data: DecrypterThread(self, data, pid) - - @lock def getInfoResult(self, rid): return self.infoResults.get(rid) @@ -120,14 +88,10 @@ class ThreadManager: self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner) oc.update(result) - def getActiveDownloads(self, user=None): - # TODO: user context - return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] - def getProgressList(self, user=None): info = [] - for thread in self.threads + self.localThreads: + for thread in self.threads: # skip if not belong to current user if user is not None and thread.owner != user: continue @@ -136,38 +100,8 @@ class ThreadManager: return info - def getActiveFiles(self): - active = self.getActiveDownloads() - - for t in self.localThreads: - active.extend(t.getActiveFiles()) - - return active - - def processingIds(self): - """get a id list of all pyfiles processed""" - return [x.id for x in self.getActiveFiles()] - def work(self): """run all task which have to be done (this is for repetitive call by core)""" - try: - self.tryReconnect() - except Exception, e: - self.log.error(_("Reconnect Failed: %s") % str(e)) - self.reconnecting.clear() - self.core.print_exc() - - self.checkThreadCount() - - try: - self.assignJob() - except Exception, e: - self.log.warning("Assign job error", e) - self.core.print_exc() - - sleep(0.5) - self.assignJob() - #it may be failed non critical so we try it again if self.infoCache and self.timestamp < time(): self.infoCache.clear() @@ -176,141 +110,3 @@ class ThreadManager: for rid in self.infoResults.keys(): if self.infoResults[rid].isStale(): del self.infoResults[rid] - - def tryReconnect(self): - """checks if reconnect needed""" - - if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()): - return False - - active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - - if not (0 < active.count(True) == len(active)): - return False - - if not exists(self.core.config['reconnect']['method']): - if exists(join(pypath, self.core.config['reconnect']['method'])): - self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) - else: - self.core.config["reconnect"]["activated"] = False - self.log.warning(_("Reconnect script not found!")) - return - - self.reconnecting.set() - - #Do reconnect - self.log.info(_("Starting reconnect")) - - while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: - sleep(0.25) - - ip = self.getIP() - - self.core.evm.dispatchEvent("reconnect:before", ip) - - self.log.debug("Old IP: %s" % ip) - - try: - reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE) - except: - self.log.warning(_("Failed executing reconnect script!")) - self.core.config["reconnect"]["activated"] = False - self.reconnecting.clear() - self.core.print_exc() - return - - reconn.wait() - sleep(1) - ip = self.getIP() - self.core.evm.dispatchEvent("reconnect:after", ip) - - self.log.info(_("Reconnected, new IP: %s") % ip) - - self.reconnecting.clear() - - def getIP(self): - """retrieve current ip""" - services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), - ("http://checkip.dyndns.org/", ".*Current IP Address: (\S+)</body>.*")] - - ip = "" - for i in range(10): - try: - sv = choice(services) - ip = getURL(sv[0]) - ip = re.match(sv[1], ip).group(1) - break - except: - ip = "" - sleep(1) - - return ip - - def checkThreadCount(self): - """checks if there is a need for increasing or reducing thread count""" - - if len(self.threads) == self.core.config.get("download", "max_downloads"): - return True - elif len(self.threads) < self.core.config.get("download", "max_downloads"): - self.createThread() - else: - free = [x for x in self.threads if not x.active] - if free: - free[0].put("quit") - - - def cleanPycurl(self): - """ make a global curl cleanup (currently unused) """ - if self.processingIds(): - return False - import pycurl - - pycurl.global_cleanup() - pycurl.global_init(pycurl.GLOBAL_DEFAULT) - self.downloaded = 0 - self.log.debug("Cleaned up pycurl") - return True - - - def assignJob(self): - """assign a job to a thread if possible""" - - if self.pause or not self.core.api.isTimeDownload(): return - - #if self.downloaded > 20: - # if not self.cleanPyCurl(): return - - free = [x for x in self.threads if not x.active] - - inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if - x.active and x.active.hasPlugin()] - inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in - inuse] - occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]]))) - - job = self.core.files.getJob(occ) - if job: - try: - job.initPlugin() - except Exception, e: - self.log.critical(str(e)) - print_exc() - job.setStatus("failed") - job.error = str(e) - job.release() - return - - spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 - if spaceLeft < self.core.config["general"]["min_free_space"]: - self.log.warning(_("Not enough space left on device")) - self.pause = True - - if free and not self.pause: - thread = free[0] - #self.downloaded += 1 - thread.put(job) - else: - #put job back - if occ not in self.core.files.jobCache: - self.core.files.jobCache[occ] = [] - self.core.files.jobCache[occ].append(job.id) |