diff options
author | RaNaN <Mast3rRaNaN@hotmail.de> | 2010-07-29 12:09:42 +0200 |
---|---|---|
committer | RaNaN <Mast3rRaNaN@hotmail.de> | 2010-07-29 12:09:42 +0200 |
commit | 2ba07aa53d2af572af2c5a43e77725abd46e1b13 (patch) | |
tree | 89ff915a3a476dd9431d38c6e2b39b9de1aeb0f5 /module/ThreadManager.py | |
parent | Added tag working for changeset 3cca18acfe7d (diff) | |
download | pyload-2ba07aa53d2af572af2c5a43e77725abd46e1b13.tar.xz |
many new stuff, some things already working
Diffstat (limited to 'module/ThreadManager.py')
-rw-r--r-- | module/ThreadManager.py | 348 |
1 files changed, 94 insertions, 254 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py index 4e2beaf49..67ea0d8d8 100644 --- a/module/ThreadManager.py +++ b/module/ThreadManager.py @@ -14,261 +14,101 @@ You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>. - - @author: mkaay - @author: spoob - @author: sebnapi + @author: RaNaN - @version: v0.3.2 """ -from __future__ import with_statement -from os.path import exists -import re -import subprocess -from threading import RLock, Thread -from time import sleep -from module.network.Request import getURL -from module.DownloadThread import DownloadThread -from module.SpeedManager import SpeedManager - -class ThreadManager(Thread): - def __init__(self, parent): - Thread.__init__(self) - self.parent = parent - self.list = parent.file_list #file list - self.threads = [] - self.lock = RLock() - self.py_downloading = [] # files downloading - self.occ_plugins = [] #occupied plugins - self.pause = True - self.reconnecting = False - - self.speedManager = SpeedManager(self) - - def run(self): - while True: - if (len(self.threads) < int(self.parent.config['general']['max_downloads']) or self.isDecryptWaiting()) and not self.pause: - job = self.getJob() - if job: - thread = self.createThread(job) - thread.start() - sleep(1) - - def createThread(self, job): - """ creates thread for Py_Load_File and append thread to self.threads - """ - thread = DownloadThread(self, job) - self.threads.append(thread) - return thread - - def removeThread(self, thread): - self.threads.remove(thread) - - def getJob(self): - """return job if suitable, otherwise send thread idle""" - - if not self.parent.server_methods.is_time_download() or self.pause or self.reconnecting or self.list.queueEmpty(): #conditions when threads dont download - return None - - if self.parent.freeSpace() < self.parent.config["general"]["min_free_space"]: - self.parent.logger.debug(_("minimal free space exceeded")) - return None - - self.initReconnect() - - self.lock.acquire() - - pyfile = None - pyfiles = self.list.getDownloadList(self.occ_plugins) - - if pyfiles: - pyfile = pyfiles[0] - self.py_downloading.append(pyfile) - self.parent.hookManager.downloadStarts(pyfile) - if not pyfile.plugin.multi_dl: - self.occ_plugins.append(pyfile.plugin.__name__) - pyfile.active = True - if pyfile.plugin.__type__ == "container": - self.parent.logger.info(_("Get links from: %s") % pyfile.url) - else: - self.parent.logger.info(_("Download starts: %s") % pyfile.url) - - self.lock.release() - return pyfile - - def isDecryptWaiting(self): - pyfiles = self.list.getDownloadList(self.occ_plugins) - for pyfile in pyfiles: - if pyfile.plugin.__type__ == "container": - return True - return False - - def handleNewInterface(self, pyfile): - plugin = pyfile.plugin - if plugin.__type__ == "container": - if plugin.createNewPackage(): - packages = plugin.getPackages() - if len(packages) == 1: - self.parent.logger.info(_("1 new package from %s") % (pyfile.status.filename,)) - else: - self.parent.logger.info(_("%i new packages from %s") % (len(packages), pyfile.status.filename)) - for name, links in packages: - if not name: - name = pyfile.status.filename - pid = self.list.packager.addNewPackage(name) - for link in links: - newFile = self.list.collector.addLink(link) - self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile)) - if len(links) == 1: - self.parent.logger.info(_("1 link in %s") % (name,)) - else: - self.parent.logger.info(_("%i links in %s") % (len(links), name)) - else: - pass - self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"]) - - def jobFinished(self, pyfile): - """manage completing download""" - self.lock.acquire() - - if not pyfile.plugin.multi_dl: - self.occ_plugins.remove(pyfile.plugin.__name__) - - pyfile.active = False - - if not pyfile.status == "reconnected": - try: - pyfile.plugin.req.pycurl.close() - except: - pass - - self.py_downloading.remove(pyfile) - - if pyfile.status.type == "finished": - if hasattr(pyfile.plugin, "__interface__") and pyfile.plugin.__interface__ >= 2: - self.handleNewInterface(pyfile) - elif pyfile.plugin.__type__ == "container": - newLinks = 0 - if pyfile.plugin.links: - if isinstance(pyfile.plugin.links, dict): - packmap = {} - for packname in pyfile.plugin.links.keys(): - packmap[packname] = self.list.packager.addNewPackage(packname) - for packname, links in pyfile.plugin.links.items(): - pid = packmap[packname] - for link in links: - newFile = self.list.collector.addLink(link) - self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile)) - self.list.packager.pushPackage2Queue(pid) - newLinks += 1 - else: - for link in pyfile.plugin.links: - newFile = self.list.collector.addLink(link) - pid = pyfile.package.data["id"] - self.list.packager.addFileToPackage(pyfile.package.data["id"], self.list.collector.popFile(newFile)) - newLinks += 1 - self.list.packager.pushPackage2Queue(pid) - - self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"]) - - if newLinks: - self.parent.logger.info(_("Parsed links from %s: %i") % (pyfile.status.filename, newLinks)) - else: - self.parent.logger.info(_("No links in %s") % pyfile.status.filename) - #~ self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.id) - #~ for link in pyfile.plugin.links: - #~ id = self.list.collector.addLink(link) - #~ pyfile.packager.pullOutPackage(pyfile.package.id) - #~ pyfile.packager.addFileToPackage(pyfile.package.id, pyfile.collector.popFile(id)) - else: - packFinished = True - for packfile in pyfile.package.files: - if packfile.status.type != "finished": - packFinished = False - break - - self.parent.logger.info(_("Download finished: %s") % pyfile.url) - if packFinished: - self.parent.logger.info(_("Package finished: %s") % pyfile.package.data['package_name']) - self.parent.hookManager.packageFinished(pyfile.package) - - elif pyfile.status.type == "reconnected": - pyfile.plugin.req.init_curl() - - elif pyfile.status.type == "failed": - self.parent.logger.warning(_("Download failed: %s | %s") % (pyfile.url, pyfile.status.error)) - with open(self.parent.config['general']['failed_file'], 'a') as f: - f.write(pyfile.url + "\n") - - elif pyfile.status.type == "aborted": - self.parent.logger.info(_("Download aborted: %s") % pyfile.url) - - self.list.save() - - self.parent.hookManager.downloadFinished(pyfile) - - self.lock.release() - return True - - def initReconnect(self): - """initialise a reonnect""" - if not self.parent.config['reconnect']['activated'] or self.reconnecting or not self.parent.server_methods.is_time_reconnect(): - return False - - if not exists(self.parent.config['reconnect']['method']): - self.parent.logger.info(self.parent.config['reconnect']['method'] + " not found") - self.parent.config['reconnect']['activated'] = False - return False - - self.lock.acquire() - - if self.checkReconnect(): - self.reconnecting = True - self.reconnect() - sleep(1.1) - - self.reconnecting = False - self.lock.release() - return True - - self.lock.release() - return False - - def checkReconnect(self): - """checks if all files want reconnect""" - - if not self.py_downloading: - return False - - i = 0 - for obj in self.py_downloading: - if obj.status.want_reconnect: - i += 1 - - if len(self.py_downloading) == i: - return True - else: - return False - - def reconnect(self): - self.parent.logger.info(_("Starting reconnect")) - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) - self.parent.hookManager.beforeReconnecting(ip) - reconn = subprocess.Popen(self.parent.config['reconnect']['method'])#, stdout=subprocess.PIPE) - reconn.wait() - sleep(1) - ip = "" - while ip == "": - try: - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #versuchen neue ip aus zu lesen - except: - ip = "" - sleep(1) - self.parent.hookManager.afterReconnecting(ip) - self.parent.logger.info(_("Reconnected, new IP: %s") % ip) +from threading import Event +import PluginThread + +######################################################################## +class ThreadManager: + """manages the download threads, assign jobs, reconnect etc""" + + #---------------------------------------------------------------------- + def __init__(self, core): + """Constructor""" + self.core = core + self.log = core.log + + self.threads = [] # thread list + self.localThreads = [] #hook+decrypter threads + + self.pause = True + + self.reconnecting = Event() + self.reconnecting.clear() + + for i in range(0, self.core.config.get("general","max_downloads") ): + self.createThread() + + + + #---------------------------------------------------------------------- + def createThread(self): + """create a download thread""" + + thread = PluginThread.DownloadThread(self) + self.threads.append(thread) + + #---------------------------------------------------------------------- + def downloadingIds(self): + """get a list of the currently downloading pyfile's ids""" + return [x.active.id for x in self.threads if x.active] + + #---------------------------------------------------------------------- + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.active.id for x in self.threads+self.localThreads if x.active] + + + #---------------------------------------------------------------------- + def work(self): + """run all task which have to be done (this is for repetivive call by core)""" + + self.checkReconnect() + self.checkThreadCount() + self.assingJob() + + #---------------------------------------------------------------------- + def checkReconnect(self): + """checks if reconnect needed""" + pass + + #---------------------------------------------------------------------- + def checkThreadCount(self): + """checks if there are need for increasing or reducing thread count""" + + if len(self.threads) == self.core.config.get("general", "max_downloads"): + return True + elif len(self.threads) < self.core.config.get("general", "max_downloads"): + self.createThread() + else: + #@TODO: close thread + pass + + + #---------------------------------------------------------------------- + def assingJob(self): + """assing a job to a thread if possible""" + + if self.pause: return + + free = [x for x in self.threads if not x.active] + + if free: + thread = free[0] + + occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ] + occ.sort() + occ = set(occ) + job = self.core.files.getJob(tuple(occ)) + if job: + job.initPlugin() + thread.put(job) + + + + - def stopAllDownloads(self): - self.pause = True - for pyfile in self.py_downloading: - pyfile.plugin.req.abort = True +
\ No newline at end of file |