summaryrefslogtreecommitdiffstats
path: root/pyload/DownloadManager.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/DownloadManager.py')
-rw-r--r--pyload/DownloadManager.py246
1 files changed, 236 insertions, 10 deletions
diff --git a/pyload/DownloadManager.py b/pyload/DownloadManager.py
index 706f9afeb..04c9f66df 100644
--- a/pyload/DownloadManager.py
+++ b/pyload/DownloadManager.py
@@ -16,18 +16,33 @@
# @author: RaNaN
###############################################################################
+from collections import defaultdict
from threading import Event
+from time import sleep
+from random import sample
+from subprocess import call
+
from ReadWriteLock import ReadWriteLock
-from utils import lock, read_lock, primary_uid
+from Api import DownloadStatus as DS
+
+from datatypes.PyFile import PyFile
+
+from utils import lock, read_lock
+from utils.fs import exists, join, free_space
+
+from network import get_ip
+
from threads.DownloadThread import DownloadThread
from threads.DecrypterThread import DecrypterThread
+
class DownloadManager:
""" Schedules and manages download and decrypter jobs. """
def __init__(self, core):
self.core = core
+ self.log = core.log
#: won't start download when true
self.paused = True
@@ -36,8 +51,10 @@ class DownloadManager:
self.free = []
#: a thread that in working must have a pyfile as active attribute
self.working = []
+ #: holds the decrypter threads
+ self.decrypter = []
- #: indicates when reconnect has occured
+ #: indicates when reconnect has occurred
self.reconnecting = Event()
self.reconnecting.clear()
@@ -47,24 +64,233 @@ class DownloadManager:
def done(self, thread):
""" Switch thread from working to free state """
self.working.remove(thread)
- self.free.append(thread)
+ # only download threads will be re-used
+ if isinstance(thread, DownloadThread):
+ self.free.append(thread)
+ thread.isWorking.clear()
+
+ @lock
+ def stop(self, thread):
+ """ Removes a thread from all lists """
+
+ if thread in self.free:
+ self.free.remove(thread)
+
+ if thread in self.working:
+ self.working.remove(thread)
+
+ @lock
+ def startDownloadThread(self, info):
+ """ Use a free dl thread or create a new one """
+ if self.free:
+ thread = self.free[0]
+ del self.free[0]
+ else:
+ thread = DownloadThread(self)
+
+ thread.put(PyFile.fromInfoData(self.core.files, info))
+
+ # wait until it picked up the task
+ thread.isWorking.wait()
+ self.working.append(thread)
+
+ @lock
+ def startDecrypterThread(self, info):
+ """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
+ self.decrypter.append(DecrypterThread(self, [(info.plugin, info.url)], info.pid))
@read_lock
- def activeDownloads(self, user):
+ def activeDownloads(self, uid=None):
""" retrieve pyfiles of running downloads """
- uid = primary_uid(user)
- return [x.active for x in self.working if uid is None or x.active.owner == uid]
+ return [x.active for x in self.working
+ if uid is None or x.active.owner == uid]
- def getProgressList(self, user):
+ @read_lock
+ def getProgressList(self, uid):
""" Progress of all running downloads """
- return [p.getProgressInfo() for p in self.activeDownloads(user)]
+ # decrypter progress could be none
+ return filter(lambda x: x is not None,
+ [p.getProgress() for p in self.working + self.decrypter
+ if uid is None or p.owner == uid])
- def canDownload(self, user):
- """ check if a user is eligible to start a new download """
+ def processingIds(self):
+ """get a id list of all pyfiles processed"""
+ return [x.id for x in self.activeDownloads(None)]
+ @read_lock
def abort(self):
""" Cancels all downloads """
+ # TODO: may dead lock
+ for t in self.working:
+ t.active.abortDownload()
+
+ @read_lock
+ def shutdown(self):
+ """ End all threads """
+ for thread in self.working + self.free:
+ thread.put("quit")
def work(self):
""" main routine that does the periodical work """
+ self.tryReconnect()
+
+ if free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 < \
+ self.core.config["general"]["min_free_space"]:
+ self.log.warning(_("Not enough space left on device"))
+ self.paused = True
+
+ if self.paused or not self.core.api.isTimeDownload():
+ return False
+
+ # at least one thread want reconnect and we are supposed to wait
+ if self.core.config['reconnect']['wait'] and self.wantReconnect() > 1:
+ return False
+
+ self.assignJobs()
+
+ def assignJobs(self):
+ """ Load jobs from db and try to assign them """
+
+ limit = self.core.config['download']['max_downloads'] - len(self.activeDownloads())
+ slots = self.getRemainingPluginSlots()
+ occ = tuple([plugin for plugin, v in slots.iteritems() if v == 0])
+ jobs = self.core.files.getJobs(occ)
+
+ # map plugin to list of jobs
+ plugins = defaultdict(list)
+
+ for uid, info in jobs.items():
+ # check the quota of each user and filter
+ quota = self.core.api.calcQuota(uid)
+ if -1 < quota < info.size:
+ del jobs[uid]
+
+ plugins[info.download.plugin].append(info)
+
+ for plugin, jobs in plugins.iteritems():
+ # we know exactly the number of remaining jobs
+ # or only can start one job if limit is not known
+ to_schedule = slots[plugin] if plugin in slots else 1
+ # start all chosen jobs
+ for job in self.chooseJobs(jobs, to_schedule):
+ # if the job was started the limit will be reduced
+ if self.startJob(job, limit):
+ limit -= 1
+
+ def chooseJobs(self, jobs, k):
+ """ make a fair choice of which k jobs to start """
+ # TODO: prefer admins, make a fairer choice?
+ if k >= len(jobs):
+ return jobs
+
+ return sample(jobs, k)
+
+ def startJob(self, info, limit):
+ """ start a download or decrypter thread with given file info """
+
+ plugin = self.core.pluginManager.findPlugin(info.download.plugin)
+ # this plugin does not exits
+ if plugin is None:
+ self.log.error(_("Plugin '%s' does not exists") % info.download.plugin)
+ self.core.files.setDownloadStatus(info.fid, DS.Failed)
+ return False
+
+ if plugin == "hoster":
+ # this job can't be started
+ if limit == 0:
+ return False
+
+ self.startDownloadThread(info)
+ return True
+
+ elif plugin == "crypter":
+ self.startDecrypterThread(info)
+ else:
+ self.log.error(_("Plugin type '%s' is can be used for downloading") % plugin)
+
+ return False
+
+ @read_lock
+ def tryReconnect(self):
+ """checks if reconnect needed"""
+
+ if not self.core.config["reconnect"]["activated"] or not self.core.api.isTimeReconnect():
+ return False
+
+ # only reconnect when all threads are ready
+ if not (0 < self.wantReconnect() == len(self.working)):
+ return False
+
+ if not exists(self.core.config['reconnect']['method']):
+ if exists(join(pypath, self.core.config['reconnect']['method'])):
+ self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
+ else:
+ self.core.config["reconnect"]["activated"] = False
+ self.log.warning(_("Reconnect script not found!"))
+ return
+
+ self.reconnecting.set()
+
+ self.log.info(_("Starting reconnect"))
+
+ # wait until all thread got the event
+ while [x.active.plugin.waiting for x in self.working].count(True) != 0:
+ sleep(0.25)
+
+ old_ip = get_ip()
+
+ self.core.evm.dispatchEvent("reconnect:before", old_ip)
+ self.log.debug("Old IP: %s" % old_ip)
+
+ try:
+ call(self.core.config['reconnect']['method'], shell=True)
+ except:
+ self.log.warning(_("Failed executing reconnect script!"))
+ self.core.config["reconnect"]["activated"] = False
+ self.reconnecting.clear()
+ self.core.print_exc()
+ return
+
+ sleep(1)
+ ip = get_ip()
+ self.core.evm.dispatchEvent("reconnect:after", ip)
+
+ if not old_ip or old_ip == ip:
+ self.log.warning(_("Reconnect not successful"))
+ else:
+ self.log.info(_("Reconnected, new IP: %s") % ip)
+
+ self.reconnecting.clear()
+
+ @read_lock
+ def wantReconnect(self):
+ """ number of downloads that are waiting for reconnect """
+ active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.working]
+ return active.count(True)
+
+ @read_lock
+ def getRemainingPluginSlots(self):
+ """ dict of plugin names mapped to remaining dls """
+ occ = defaultdict(lambda: -1)
+ # decrypter are treated as occupied
+ for p in self.decrypter:
+ progress = p.getProgressInfo()
+ if progress:
+ occ[progress.plugin] = 0
+
+ # get all default dl limits
+ for t in self.working:
+ if not t.active.hasPlugin(): continue
+ limit = t.active.plugin.getDownloadLimit()
+ if limit < 0: continue
+ occ[t.active.pluginname] = limit
+
+ # subtract with running downloads
+ for t in self.working:
+ if not t.active.hasPlugin(): continue
+ plugin = t.active.pluginname
+ if plugin in occ:
+ occ[plugin] -= 1
+
+ return occ \ No newline at end of file