summaryrefslogtreecommitdiffstats
path: root/module/ThreadManager.py
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-07-29 12:09:42 +0200
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-07-29 12:09:42 +0200
commit2ba07aa53d2af572af2c5a43e77725abd46e1b13 (patch)
tree89ff915a3a476dd9431d38c6e2b39b9de1aeb0f5 /module/ThreadManager.py
parentAdded tag working for changeset 3cca18acfe7d (diff)
downloadpyload-2ba07aa53d2af572af2c5a43e77725abd46e1b13.tar.xz
many new stuff, some things already working
Diffstat (limited to 'module/ThreadManager.py')
-rw-r--r--module/ThreadManager.py348
1 files changed, 94 insertions, 254 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py
index 4e2beaf49..67ea0d8d8 100644
--- a/module/ThreadManager.py
+++ b/module/ThreadManager.py
@@ -14,261 +14,101 @@
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
-
- @author: mkaay
- @author: spoob
- @author: sebnapi
+
@author: RaNaN
- @version: v0.3.2
"""
-from __future__ import with_statement
-from os.path import exists
-import re
-import subprocess
-from threading import RLock, Thread
-from time import sleep
-from module.network.Request import getURL
-from module.DownloadThread import DownloadThread
-from module.SpeedManager import SpeedManager
-
-class ThreadManager(Thread):
- def __init__(self, parent):
- Thread.__init__(self)
- self.parent = parent
- self.list = parent.file_list #file list
- self.threads = []
- self.lock = RLock()
- self.py_downloading = [] # files downloading
- self.occ_plugins = [] #occupied plugins
- self.pause = True
- self.reconnecting = False
-
- self.speedManager = SpeedManager(self)
-
- def run(self):
- while True:
- if (len(self.threads) < int(self.parent.config['general']['max_downloads']) or self.isDecryptWaiting()) and not self.pause:
- job = self.getJob()
- if job:
- thread = self.createThread(job)
- thread.start()
- sleep(1)
-
- def createThread(self, job):
- """ creates thread for Py_Load_File and append thread to self.threads
- """
- thread = DownloadThread(self, job)
- self.threads.append(thread)
- return thread
-
- def removeThread(self, thread):
- self.threads.remove(thread)
-
- def getJob(self):
- """return job if suitable, otherwise send thread idle"""
-
- if not self.parent.server_methods.is_time_download() or self.pause or self.reconnecting or self.list.queueEmpty(): #conditions when threads dont download
- return None
-
- if self.parent.freeSpace() < self.parent.config["general"]["min_free_space"]:
- self.parent.logger.debug(_("minimal free space exceeded"))
- return None
-
- self.initReconnect()
-
- self.lock.acquire()
-
- pyfile = None
- pyfiles = self.list.getDownloadList(self.occ_plugins)
-
- if pyfiles:
- pyfile = pyfiles[0]
- self.py_downloading.append(pyfile)
- self.parent.hookManager.downloadStarts(pyfile)
- if not pyfile.plugin.multi_dl:
- self.occ_plugins.append(pyfile.plugin.__name__)
- pyfile.active = True
- if pyfile.plugin.__type__ == "container":
- self.parent.logger.info(_("Get links from: %s") % pyfile.url)
- else:
- self.parent.logger.info(_("Download starts: %s") % pyfile.url)
-
- self.lock.release()
- return pyfile
-
- def isDecryptWaiting(self):
- pyfiles = self.list.getDownloadList(self.occ_plugins)
- for pyfile in pyfiles:
- if pyfile.plugin.__type__ == "container":
- return True
- return False
-
- def handleNewInterface(self, pyfile):
- plugin = pyfile.plugin
- if plugin.__type__ == "container":
- if plugin.createNewPackage():
- packages = plugin.getPackages()
- if len(packages) == 1:
- self.parent.logger.info(_("1 new package from %s") % (pyfile.status.filename,))
- else:
- self.parent.logger.info(_("%i new packages from %s") % (len(packages), pyfile.status.filename))
- for name, links in packages:
- if not name:
- name = pyfile.status.filename
- pid = self.list.packager.addNewPackage(name)
- for link in links:
- newFile = self.list.collector.addLink(link)
- self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile))
- if len(links) == 1:
- self.parent.logger.info(_("1 link in %s") % (name,))
- else:
- self.parent.logger.info(_("%i links in %s") % (len(links), name))
- else:
- pass
- self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"])
-
- def jobFinished(self, pyfile):
- """manage completing download"""
- self.lock.acquire()
-
- if not pyfile.plugin.multi_dl:
- self.occ_plugins.remove(pyfile.plugin.__name__)
-
- pyfile.active = False
-
- if not pyfile.status == "reconnected":
- try:
- pyfile.plugin.req.pycurl.close()
- except:
- pass
-
- self.py_downloading.remove(pyfile)
-
- if pyfile.status.type == "finished":
- if hasattr(pyfile.plugin, "__interface__") and pyfile.plugin.__interface__ >= 2:
- self.handleNewInterface(pyfile)
- elif pyfile.plugin.__type__ == "container":
- newLinks = 0
- if pyfile.plugin.links:
- if isinstance(pyfile.plugin.links, dict):
- packmap = {}
- for packname in pyfile.plugin.links.keys():
- packmap[packname] = self.list.packager.addNewPackage(packname)
- for packname, links in pyfile.plugin.links.items():
- pid = packmap[packname]
- for link in links:
- newFile = self.list.collector.addLink(link)
- self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile))
- self.list.packager.pushPackage2Queue(pid)
- newLinks += 1
- else:
- for link in pyfile.plugin.links:
- newFile = self.list.collector.addLink(link)
- pid = pyfile.package.data["id"]
- self.list.packager.addFileToPackage(pyfile.package.data["id"], self.list.collector.popFile(newFile))
- newLinks += 1
- self.list.packager.pushPackage2Queue(pid)
-
- self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"])
-
- if newLinks:
- self.parent.logger.info(_("Parsed links from %s: %i") % (pyfile.status.filename, newLinks))
- else:
- self.parent.logger.info(_("No links in %s") % pyfile.status.filename)
- #~ self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.id)
- #~ for link in pyfile.plugin.links:
- #~ id = self.list.collector.addLink(link)
- #~ pyfile.packager.pullOutPackage(pyfile.package.id)
- #~ pyfile.packager.addFileToPackage(pyfile.package.id, pyfile.collector.popFile(id))
- else:
- packFinished = True
- for packfile in pyfile.package.files:
- if packfile.status.type != "finished":
- packFinished = False
- break
-
- self.parent.logger.info(_("Download finished: %s") % pyfile.url)
- if packFinished:
- self.parent.logger.info(_("Package finished: %s") % pyfile.package.data['package_name'])
- self.parent.hookManager.packageFinished(pyfile.package)
-
- elif pyfile.status.type == "reconnected":
- pyfile.plugin.req.init_curl()
-
- elif pyfile.status.type == "failed":
- self.parent.logger.warning(_("Download failed: %s | %s") % (pyfile.url, pyfile.status.error))
- with open(self.parent.config['general']['failed_file'], 'a') as f:
- f.write(pyfile.url + "\n")
-
- elif pyfile.status.type == "aborted":
- self.parent.logger.info(_("Download aborted: %s") % pyfile.url)
-
- self.list.save()
-
- self.parent.hookManager.downloadFinished(pyfile)
-
- self.lock.release()
- return True
-
- def initReconnect(self):
- """initialise a reonnect"""
- if not self.parent.config['reconnect']['activated'] or self.reconnecting or not self.parent.server_methods.is_time_reconnect():
- return False
-
- if not exists(self.parent.config['reconnect']['method']):
- self.parent.logger.info(self.parent.config['reconnect']['method'] + " not found")
- self.parent.config['reconnect']['activated'] = False
- return False
-
- self.lock.acquire()
-
- if self.checkReconnect():
- self.reconnecting = True
- self.reconnect()
- sleep(1.1)
-
- self.reconnecting = False
- self.lock.release()
- return True
-
- self.lock.release()
- return False
-
- def checkReconnect(self):
- """checks if all files want reconnect"""
-
- if not self.py_downloading:
- return False
-
- i = 0
- for obj in self.py_downloading:
- if obj.status.want_reconnect:
- i += 1
-
- if len(self.py_downloading) == i:
- return True
- else:
- return False
-
- def reconnect(self):
- self.parent.logger.info(_("Starting reconnect"))
- ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1)
- self.parent.hookManager.beforeReconnecting(ip)
- reconn = subprocess.Popen(self.parent.config['reconnect']['method'])#, stdout=subprocess.PIPE)
- reconn.wait()
- sleep(1)
- ip = ""
- while ip == "":
- try:
- ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #versuchen neue ip aus zu lesen
- except:
- ip = ""
- sleep(1)
- self.parent.hookManager.afterReconnecting(ip)
- self.parent.logger.info(_("Reconnected, new IP: %s") % ip)
+from threading import Event
+import PluginThread
+
+########################################################################
+class ThreadManager:
+ """manages the download threads, assign jobs, reconnect etc"""
+
+ #----------------------------------------------------------------------
+ def __init__(self, core):
+ """Constructor"""
+ self.core = core
+ self.log = core.log
+
+ self.threads = [] # thread list
+ self.localThreads = [] #hook+decrypter threads
+
+ self.pause = True
+
+ self.reconnecting = Event()
+ self.reconnecting.clear()
+
+ for i in range(0, self.core.config.get("general","max_downloads") ):
+ self.createThread()
+
+
+
+ #----------------------------------------------------------------------
+ def createThread(self):
+ """create a download thread"""
+
+ thread = PluginThread.DownloadThread(self)
+ self.threads.append(thread)
+
+ #----------------------------------------------------------------------
+ def downloadingIds(self):
+ """get a list of the currently downloading pyfile's ids"""
+ return [x.active.id for x in self.threads if x.active]
+
+ #----------------------------------------------------------------------
+ def processingIds(self):
+ """get a id list of all pyfiles processed"""
+ return [x.active.id for x in self.threads+self.localThreads if x.active]
+
+
+ #----------------------------------------------------------------------
+ def work(self):
+ """run all task which have to be done (this is for repetivive call by core)"""
+
+ self.checkReconnect()
+ self.checkThreadCount()
+ self.assingJob()
+
+ #----------------------------------------------------------------------
+ def checkReconnect(self):
+ """checks if reconnect needed"""
+ pass
+
+ #----------------------------------------------------------------------
+ def checkThreadCount(self):
+ """checks if there are need for increasing or reducing thread count"""
+
+ if len(self.threads) == self.core.config.get("general", "max_downloads"):
+ return True
+ elif len(self.threads) < self.core.config.get("general", "max_downloads"):
+ self.createThread()
+ else:
+ #@TODO: close thread
+ pass
+
+
+ #----------------------------------------------------------------------
+ def assingJob(self):
+ """assing a job to a thread if possible"""
+
+ if self.pause: return
+
+ free = [x for x in self.threads if not x.active]
+
+ if free:
+ thread = free[0]
+
+ occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ]
+ occ.sort()
+ occ = set(occ)
+ job = self.core.files.getJob(tuple(occ))
+ if job:
+ job.initPlugin()
+ thread.put(job)
+
+
+
+
- def stopAllDownloads(self):
- self.pause = True
- for pyfile in self.py_downloading:
- pyfile.plugin.req.abort = True
+ \ No newline at end of file