diff options
Diffstat (limited to 'pyload/DownloadManager.py')
-rw-r--r-- | pyload/DownloadManager.py | 246 |
1 files changed, 236 insertions, 10 deletions
diff --git a/pyload/DownloadManager.py b/pyload/DownloadManager.py index 706f9afeb..04c9f66df 100644 --- a/pyload/DownloadManager.py +++ b/pyload/DownloadManager.py @@ -16,18 +16,33 @@ # @author: RaNaN ############################################################################### +from collections import defaultdict from threading import Event +from time import sleep +from random import sample +from subprocess import call + from ReadWriteLock import ReadWriteLock -from utils import lock, read_lock, primary_uid +from Api import DownloadStatus as DS + +from datatypes.PyFile import PyFile + +from utils import lock, read_lock +from utils.fs import exists, join, free_space + +from network import get_ip + from threads.DownloadThread import DownloadThread from threads.DecrypterThread import DecrypterThread + class DownloadManager: """ Schedules and manages download and decrypter jobs. """ def __init__(self, core): self.core = core + self.log = core.log #: won't start download when true self.paused = True @@ -36,8 +51,10 @@ class DownloadManager: self.free = [] #: a thread that in working must have a pyfile as active attribute self.working = [] + #: holds the decrypter threads + self.decrypter = [] - #: indicates when reconnect has occured + #: indicates when reconnect has occurred self.reconnecting = Event() self.reconnecting.clear() @@ -47,24 +64,233 @@ class DownloadManager: def done(self, thread): """ Switch thread from working to free state """ self.working.remove(thread) - self.free.append(thread) + # only download threads will be re-used + if isinstance(thread, DownloadThread): + self.free.append(thread) + thread.isWorking.clear() + + @lock + def stop(self, thread): + """ Removes a thread from all lists """ + + if thread in self.free: + self.free.remove(thread) + + if thread in self.working: + self.working.remove(thread) + + @lock + def startDownloadThread(self, info): + """ Use a free dl thread or create a new one """ + if self.free: + thread = self.free[0] + del self.free[0] + else: + thread = DownloadThread(self) + + thread.put(PyFile.fromInfoData(self.core.files, info)) + + # wait until it picked up the task + thread.isWorking.wait() + self.working.append(thread) + + @lock + def startDecrypterThread(self, info): + """ Start decrypting of entered data, all links in one package are accumulated to one thread.""" + self.decrypter.append(DecrypterThread(self, [(info.plugin, info.url)], info.pid)) @read_lock - def activeDownloads(self, user): + def activeDownloads(self, uid=None): """ retrieve pyfiles of running downloads """ - uid = primary_uid(user) - return [x.active for x in self.working if uid is None or x.active.owner == uid] + return [x.active for x in self.working + if uid is None or x.active.owner == uid] - def getProgressList(self, user): + @read_lock + def getProgressList(self, uid): """ Progress of all running downloads """ - return [p.getProgressInfo() for p in self.activeDownloads(user)] + # decrypter progress could be none + return filter(lambda x: x is not None, + [p.getProgress() for p in self.working + self.decrypter + if uid is None or p.owner == uid]) - def canDownload(self, user): - """ check if a user is eligible to start a new download """ + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.id for x in self.activeDownloads(None)] + @read_lock def abort(self): """ Cancels all downloads """ + # TODO: may dead lock + for t in self.working: + t.active.abortDownload() + + @read_lock + def shutdown(self): + """ End all threads """ + for thread in self.working + self.free: + thread.put("quit") def work(self): """ main routine that does the periodical work """ + self.tryReconnect() + + if free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024 < \ + self.core.config["general"]["min_free_space"]: + self.log.warning(_("Not enough space left on device")) + self.paused = True + + if self.paused or not self.core.api.isTimeDownload(): + return False + + # at least one thread want reconnect and we are supposed to wait + if self.core.config['reconnect']['wait'] and self.wantReconnect() > 1: + return False + + self.assignJobs() + + def assignJobs(self): + """ Load jobs from db and try to assign them """ + + limit = self.core.config['download']['max_downloads'] - len(self.activeDownloads()) + slots = self.getRemainingPluginSlots() + occ = tuple([plugin for plugin, v in slots.iteritems() if v == 0]) + jobs = self.core.files.getJobs(occ) + + # map plugin to list of jobs + plugins = defaultdict(list) + + for uid, info in jobs.items(): + # check the quota of each user and filter + quota = self.core.api.calcQuota(uid) + if -1 < quota < info.size: + del jobs[uid] + + plugins[info.download.plugin].append(info) + + for plugin, jobs in plugins.iteritems(): + # we know exactly the number of remaining jobs + # or only can start one job if limit is not known + to_schedule = slots[plugin] if plugin in slots else 1 + # start all chosen jobs + for job in self.chooseJobs(jobs, to_schedule): + # if the job was started the limit will be reduced + if self.startJob(job, limit): + limit -= 1 + + def chooseJobs(self, jobs, k): + """ make a fair choice of which k jobs to start """ + # TODO: prefer admins, make a fairer choice? + if k >= len(jobs): + return jobs + + return sample(jobs, k) + + def startJob(self, info, limit): + """ start a download or decrypter thread with given file info """ + + plugin = self.core.pluginManager.findPlugin(info.download.plugin) + # this plugin does not exits + if plugin is None: + self.log.error(_("Plugin '%s' does not exists") % info.download.plugin) + self.core.files.setDownloadStatus(info.fid, DS.Failed) + return False + + if plugin == "hoster": + # this job can't be started + if limit == 0: + return False + + self.startDownloadThread(info) + return True + + elif plugin == "crypter": + self.startDecrypterThread(info) + else: + self.log.error(_("Plugin type '%s' is can be used for downloading") % plugin) + + return False + + @read_lock + def tryReconnect(self): + """checks if reconnect needed""" + + if not self.core.config["reconnect"]["activated"] or not self.core.api.isTimeReconnect(): + return False + + # only reconnect when all threads are ready + if not (0 < self.wantReconnect() == len(self.working)): + return False + + if not exists(self.core.config['reconnect']['method']): + if exists(join(pypath, self.core.config['reconnect']['method'])): + self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) + else: + self.core.config["reconnect"]["activated"] = False + self.log.warning(_("Reconnect script not found!")) + return + + self.reconnecting.set() + + self.log.info(_("Starting reconnect")) + + # wait until all thread got the event + while [x.active.plugin.waiting for x in self.working].count(True) != 0: + sleep(0.25) + + old_ip = get_ip() + + self.core.evm.dispatchEvent("reconnect:before", old_ip) + self.log.debug("Old IP: %s" % old_ip) + + try: + call(self.core.config['reconnect']['method'], shell=True) + except: + self.log.warning(_("Failed executing reconnect script!")) + self.core.config["reconnect"]["activated"] = False + self.reconnecting.clear() + self.core.print_exc() + return + + sleep(1) + ip = get_ip() + self.core.evm.dispatchEvent("reconnect:after", ip) + + if not old_ip or old_ip == ip: + self.log.warning(_("Reconnect not successful")) + else: + self.log.info(_("Reconnected, new IP: %s") % ip) + + self.reconnecting.clear() + + @read_lock + def wantReconnect(self): + """ number of downloads that are waiting for reconnect """ + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.working] + return active.count(True) + + @read_lock + def getRemainingPluginSlots(self): + """ dict of plugin names mapped to remaining dls """ + occ = defaultdict(lambda: -1) + # decrypter are treated as occupied + for p in self.decrypter: + progress = p.getProgressInfo() + if progress: + occ[progress.plugin] = 0 + + # get all default dl limits + for t in self.working: + if not t.active.hasPlugin(): continue + limit = t.active.plugin.getDownloadLimit() + if limit < 0: continue + occ[t.active.pluginname] = limit + + # subtract with running downloads + for t in self.working: + if not t.active.hasPlugin(): continue + plugin = t.active.pluginname + if plugin in occ: + occ[plugin] -= 1 + + return occ
\ No newline at end of file |