diff options
author | mkaay <mkaay@mkaay.de> | 2010-05-04 18:14:58 +0200 |
---|---|---|
committer | mkaay <mkaay@mkaay.de> | 2010-05-04 18:14:58 +0200 |
commit | 74f2fa861eb59ec1675de8a217b0265ec927815b (patch) | |
tree | 4582320cc59afe8fdcfd1ef12422ccdd6608f71b /module/ThreadManager.py | |
parent | config dir fix (diff) | |
download | pyload-74f2fa861eb59ec1675de8a217b0265ec927815b.tar.xz |
better threadmanager
Diffstat (limited to 'module/ThreadManager.py')
-rw-r--r-- | module/ThreadManager.py | 229 |
1 files changed, 229 insertions, 0 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py new file mode 100644 index 000000000..ab0f99cfa --- /dev/null +++ b/module/ThreadManager.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, + or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see <http://www.gnu.org/licenses/>. + + @author: mkaay + @author: spoob + @author: sebnapi + @author: RaNaN + @version: v0.3.2 +""" + +from __future__ import with_statement +from os.path import exists +import re +import subprocess +from threading import RLock, Thread +from time import sleep +from module.network.Request import getURL +from module.DownloadThread import DownloadThread +from module.SpeedManager import SpeedManager + +class ThreadManager(Thread): + def __init__(self, parent): + Thread.__init__(self) + self.parent = parent + self.list = parent.file_list #file list + self.threads = [] + self.lock = RLock() + self.py_downloading = [] # files downloading + self.occ_plugins = [] #occupied plugins + self.pause = True + self.reconnecting = False + + self.speedManager = SpeedManager(self) + + def run(self): + while True: + if len(self.threads) < int(self.parent.config['general']['max_downloads']) and not self.pause: + job = self.getJob() + if job: + thread = self.createThread(job) + thread.start() + sleep(1) + + def createThread(self, job): + """ creates thread for Py_Load_File and append thread to self.threads + """ + thread = DownloadThread(self, job) + self.threads.append(thread) + return thread + + def removeThread(self, thread): + self.threads.remove(thread) + + def getJob(self): + """return job if suitable, otherwise send thread idle""" + + if not self.parent.server_methods.is_time_download() or self.pause or self.reconnecting or self.list.queueEmpty(): #conditions when threads dont download + return None + + if self.parent.freeSpace() < self.parent.config["general"]["min_free_space"]: + self.parent.logger.debug(_("minimal free space exceeded")) + return None + + self.initReconnect() + + self.lock.acquire() + + pyfile = None + pyfiles = self.list.getDownloadList(self.occ_plugins) + + if pyfiles: + pyfile = pyfiles[0] + self.py_downloading.append(pyfile) + self.parent.hookManager.downloadStarts(pyfile) + if not pyfile.plugin.multi_dl: + self.occ_plugins.append(pyfile.modul.__name__) + pyfile.active = True + if pyfile.plugin.props['type'] == "container": + self.parent.logger.info(_("Get links from: %s") % pyfile.url) + else: + self.parent.logger.info(_("Download starts: %s") % pyfile.url) + + self.lock.release() + return pyfile + + def jobFinished(self, pyfile): + """manage completing download""" + self.lock.acquire() + + if not pyfile.plugin.multi_dl: + self.occ_plugins.remove(pyfile.modul.__name__) + + pyfile.active = False + + if not pyfile.status == "reconnected": + try: + pyfile.plugin.req.pycurl.close() + except: + pass + + self.py_downloading.remove(pyfile) + + if pyfile.status.type == "finished": + if pyfile.plugin.props['type'] == "container": + newLinks = 0 + if pyfile.plugin.links: + if isinstance(pyfile.plugin.links, dict): + packmap = {} + for packname in pyfile.plugin.links.keys(): + packmap[packname] = self.list.packager.addNewPackage(packname) + for packname, links in pyfile.plugin.links.items(): + pid = packmap[packname] + for link in links: + newFile = self.list.collector.addLink(link) + self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile)) + newLinks += 1 + else: + for link in pyfile.plugin.links: + newFile = self.list.collector.addLink(link) + self.list.packager.addFileToPackage(pyfile.package.data["id"], self.list.collector.popFile(newFile)) + newLinks += 1 + #self.list.packager.pushPackage2Queue(pyfile.package.data["id"]) + self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"]) + + if newLinks: + self.parent.logger.info(_("Parsed links from %s: %i") % (pyfile.status.filename, newLinks)) + else: + self.parent.logger.info(_("No links in %s") % pyfile.status.filename) + #~ self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.id) + #~ for link in pyfile.plugin.links: + #~ id = self.list.collector.addLink(link) + #~ pyfile.packager.pullOutPackage(pyfile.package.id) + #~ pyfile.packager.addFileToPackage(pyfile.package.id, pyfile.collector.popFile(id)) + else: + self.parent.logger.info(_("Download finished: %s") % pyfile.url) + + elif pyfile.status.type == "reconnected": + pyfile.plugin.req.init_curl() + + elif pyfile.status.type == "failed": + self.parent.logger.warning(_("Download failed: %s | %s") % (pyfile.url, pyfile.status.error)) + with open(self.parent.config['general']['failed_file'], 'a') as f: + f.write(pyfile.url + "\n") + + elif pyfile.status.type == "aborted": + self.parent.logger.info(_("Download aborted: %s") % pyfile.url) + + self.list.save() + + self.parent.hookManager.downloadFinished(pyfile) + + self.lock.release() + return True + + def initReconnect(self): + """initialise a reonnect""" + if not self.parent.config['reconnect']['activated'] or self.reconnecting or not self.parent.server_methods.is_time_reconnect(): + return False + + if not exists(self.parent.config['reconnect']['method']): + self.parent.logger.info(self.parent.config['reconnect']['method'] + " not found") + self.parent.config['reconnect']['activated'] = False + return False + + self.lock.acquire() + + if self.checkReconnect(): + self.reconnecting = True + self.reconnect() + time.sleep(1.1) + + self.reconnecting = False + self.lock.release() + return True + + self.lock.release() + return False + + def checkReconnect(self): + """checks if all files want reconnect""" + + if not self.py_downloading: + return False + + i = 0 + for obj in self.py_downloading: + if obj.status.want_reconnect: + i += 1 + + if len(self.py_downloading) == i: + return True + else: + return False + + def reconnect(self): + self.parent.logger.info(_("Starting reconnect")) + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) + self.parent.hookManager.beforeReconnecting(ip) + reconn = subprocess.Popen(self.parent.config['reconnect']['method'])#, stdout=subprocess.PIPE) + reconn.wait() + time.sleep(1) + ip = "" + while ip == "": + try: + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #versuchen neue ip aus zu lesen + except: + ip = "" + time.sleep(1) + self.parent.hookManager.afterReconnecting(ip) + self.parent.logger.info(_("Reconnected, new IP: %s") % ip) + + def stopAllDownloads(self): + self.pause = True + for pyfile in self.py_downloading: + pyfile.plugin.req.abort = True |