diff options
Diffstat (limited to 'module/thread_list.py')
-rw-r--r-- | module/thread_list.py | 175 |
1 files changed, 148 insertions, 27 deletions
diff --git a/module/thread_list.py b/module/thread_list.py index d4f2dac97..1a66bf6f5 100644 --- a/module/thread_list.py +++ b/module/thread_list.py @@ -18,12 +18,12 @@ # ### from __future__ import with_statement +from os.path import exists import re import subprocess +from threading import RLock, Thread import time import urllib2 -from os.path import exists -from threading import RLock from download_thread import Download_Thread @@ -40,6 +40,8 @@ class Thread_List(object): self.reconnecting = False self.select_thread() + if self.parent.config['general']['download_speed_limit'] != 0: + self.speedManager = self.SpeedManager(self) def create_thread(self): """ creates thread for Py_Load_File and append thread to self.threads @@ -60,7 +62,7 @@ class Thread_List(object): def get_job(self): """return job if suitable, otherwise send thread idle""" - if not self.parent.is_dltime() or self.pause or self.reconnecting or not self.list.files: #conditions when threads dont download + if not self.parent.server_methods.is_time_download() or self.pause or self.reconnecting or self.list.queueEmpty(): #conditions when threads dont download return None self.init_reconnect() @@ -68,16 +70,19 @@ class Thread_List(object): self.lock.acquire() pyfile = None - for i in range(len(self.list.files)): - if not self.list.files[i].modul.__name__ in self.occ_plugins: - pyfile = self.list.files.pop(i) - break + pyfiles = self.list.getDownloadList(self.occ_plugins) - if pyfile: + if pyfiles: + pyfile = pyfiles[0] self.py_downloading.append(pyfile) + self.parent.hookManager.downloadStarts(pyfile) if not pyfile.plugin.multi_dl: self.occ_plugins.append(pyfile.modul.__name__) - self.parent.logger.info('Download starts: ' + pyfile.url) + pyfile.active = True + if pyfile.plugin.props['type'] == "container": + self.parent.logger.info('Get links from: ' + pyfile.url) + else: + self.parent.logger.info('Download starts: ' + pyfile.url) self.lock.release() return pyfile @@ -89,6 +94,8 @@ class Thread_List(object): if not pyfile.plugin.multi_dl: self.occ_plugins.remove(pyfile.modul.__name__) + + pyfile.active = False if pyfile.plugin.req.curl and not pyfile.status == "reconnected": try: @@ -99,41 +106,65 @@ class Thread_List(object): self.py_downloading.remove(pyfile) if pyfile.status.type == "finished": - self.parent.logger.info('Download finished: ' + pyfile.url + ' @' + str(pyfile.status.get_speed()) + 'kb/s') - - self.list.remove(pyfile) - if pyfile.plugin.props['type'] == "container": - self.list.extend(pyfile.plugin.links) - - - elif pyfile.status.type == "reconnected":#put it back in queque + newLinks = 0 + if pyfile.plugin.links: + if isinstance(pyfile.plugin.links, dict): + packmap = {} + for packname in pyfile.plugin.links.keys(): + packmap[packname] = self.list.packager.addNewPackage(packname) + for packname, links in pyfile.plugin.links.items(): + pid = packmap[packname] + for link in links: + newFile = self.list.collector.addLink(link) + self.list.packager.addFileToPackage(pid, self.list.collector.popFile(newFile)) + newLinks += 1 + else: + for link in pyfile.plugin.links: + newFile = self.list.collector.addLink(link) + self.list.packager.addFileToPackage(pyfile.package.data["id"], self.list.collector.popFile(newFile)) + newLinks += 1 + #self.list.packager.pushPackage2Queue(pyfile.package.data["id"]) + self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.data["id"]) + + if newLinks: + self.parent.logger.info("Parsed links from %s: %i" % (pyfile.status.filename, newLinks)) + else: + self.parent.logger.info("No links in %s" % pyfile.status.filename) + #~ self.list.packager.removeFileFromPackage(pyfile.id, pyfile.package.id) + #~ for link in pyfile.plugin.links: + #~ id = self.list.collector.addLink(link) + #~ pyfile.packager.pullOutPackage(pyfile.package.id) + #~ pyfile.packager.addFileToPackage(pyfile.package.id, pyfile.collector.popFile(id)) + else: + self.parent.logger.info("Download finished: %s" % pyfile.url) + + elif pyfile.status.type == "reconnected": pyfile.plugin.req.init_curl() - self.list.files.insert(0, pyfile) elif pyfile.status.type == "failed": - self.parent.logger.warning("Download failed: " + pyfile.url+ " | " + pyfile.status.error) + self.parent.logger.warning("Download failed: " + pyfile.url + " | " + pyfile.status.error) with open(self.parent.config['general']['failed_file'], 'a') as f: f.write(pyfile.url + "\n") - self.list.remove(pyfile) elif pyfile.status.type == "aborted": self.parent.logger.info("Download aborted: " + pyfile.url) - self.list.remove(pyfile) self.list.save() + self.parent.hookManager.downloadFinished(pyfile) + self.lock.release() return True def init_reconnect(self): """initialise a reonnect""" - if not self.parent.config['general']['use_reconnect'] or self.reconnecting or not self.parent.is_reconnect_time(): + if not self.parent.config['reconnect']['activated'] or self.reconnecting or not self.parent.server_methods.is_time_reconnect(): return False - if not exists(self.parent.config['general']['reconnect_method']): - self.parent.logger.info(self.parent.config['general']['reconnect_method'] + " not found") - self.parent.config['general']['use_reconnect'] = False + if not exists(self.parent.config['reconnect']['method']): + self.parent.logger.info(self.parent.config['reconnect']['method'] + " not found") + self.parent.config['reconnect']['activated'] = False return False self.lock.acquire() @@ -167,14 +198,104 @@ class Thread_List(object): return False def reconnect(self): - reconn = subprocess.Popen(self.parent.config['general']['reconnect_method']) + self.parent.logger.info("Start reconnect") + ip = re.match(".*Current IP Address: (.*)</body>.*", urllib2.urlopen("http://checkip.dyndns.org/").read()).group(1) + self.parent.hookManager.beforeReconnecting(ip) + reconn = subprocess.Popen(self.parent.config['reconnect']['method'])#, stdout=subprocess.PIPE) reconn.wait() time.sleep(1) ip = "" - while ip == "": #solange versuch bis neue ip ausgelesen + while ip == "": try: ip = re.match(".*Current IP Address: (.*)</body>.*", urllib2.urlopen("http://checkip.dyndns.org/").read()).group(1) #versuchen neue ip aus zu lesen except: ip = "" time.sleep(1) + self.parent.hookManager.afterReconnecting(ip) self.parent.logger.info("Reconnected, new IP: " + ip) + + def stopAllDownloads(self): + self.pause = True + for pyfile in self.py_downloading: + pyfile.plugin.req.abort = True + + class SpeedManager(Thread): + def __init__(self, parent): + Thread.__init__(self) + self.parent = parent + self.running = True + self.lastSlowCheck = 0.0 + + stat = {} + stat["slow_downloads"] = None + stat["each_speed"] = None + stat["each_speed_optimized"] = None + self.stat = stat + + self.slowCheckInterval = 60 + self.slowCheckTestTime = 25 + + self.logger = self.parent.parent.logger + self.start() + + def run(self): + while self.running: + time.sleep(1) + self.manageSpeed() + + def getMaxSpeed(self): + return self.parent.parent.getMaxSpeed() + + def manageSpeed(self): + maxSpeed = self.getMaxSpeed() + if maxSpeed <= 0: + for thread in self.parent.py_downloading: + thread.plugin.req.speedLimitActive = False + return + threads = self.parent.py_downloading + threadCount = len(threads) + if threadCount <= 0: + return + eachSpeed = maxSpeed/threadCount + + currentOverallSpeed = 0 + restSpeed = maxSpeed - currentOverallSpeed + speeds = [] + for thread in threads: + currentOverallSpeed += thread.plugin.req.dl_speed + speeds.append((thread.plugin.req.dl_speed, thread.plugin.req.averageSpeed, thread)) + thread.plugin.req.speedLimitActive = True + + if currentOverallSpeed+50 < maxSpeed: + for thread in self.parent.py_downloading: + thread.plugin.req.speedLimitActive = False + return + + slowCount = 0 + slowSpeed = 0 + if self.lastSlowCheck + self.slowCheckInterval + self.slowCheckTestTime < time.time(): + self.lastSlowCheck = time.time() + if self.lastSlowCheck + self.slowCheckInterval < time.time() < self.lastSlowCheck + self.slowCheckInterval + self.slowCheckTestTime: + for speed in speeds: + speed[2].plugin.req.isSlow = False + else: + for speed in speeds: + if speed[0] <= eachSpeed-7: + if speed[1] < eachSpeed-15: + if speed[2].plugin.req.dl_time > 0 and speed[2].plugin.req.dl_time+30 < time.time(): + speed[2].plugin.req.isSlow = True + if not speed[1]-5 < speed[2].plugin.req.maxSpeed/1024 < speed[1]+5: + speed[2].plugin.req.maxSpeed = (speed[1]+10)*1024 + if speed[2].plugin.req.isSlow: + slowCount += 1 + slowSpeed += speed[2].plugin.req.maxSpeed/1024 + + stat["slow_downloads"] = slowCount + stat["each_speed"] = eachSpeed + eachSpeed = (maxSpeed - slowSpeed) / (threadCount - slowCount) + stat["each_speed_optimized"] = eachSpeed + for speed in speeds: + if speed[2].plugin.req.isSlow: + continue + speed[2].plugin.req.maxSpeed = eachSpeed*1024 + print "max", speed[2].plugin.req.maxSpeed, "current", speed[2].plugin.req.dl_speed |