diff options
author | mkaay <mkaay@mkaay.de> | 2010-08-07 17:40:43 +0200 |
---|---|---|
committer | mkaay <mkaay@mkaay.de> | 2010-08-07 17:40:43 +0200 |
commit | afb5e3371a9b43dff97131440affcc2c68ec5593 (patch) | |
tree | 7d2c9f1b8a016fc115881d607fcdeb2c12b30703 /module/ThreadManager.py | |
parent | hook improvements (diff) | |
download | pyload-afb5e3371a9b43dff97131440affcc2c68ec5593.tar.xz |
file info prefetching (RapidshareCom UploadedTo), download folder fix, SerienjunkiesOrg fix
Diffstat (limited to 'module/ThreadManager.py')
-rw-r--r-- | module/ThreadManager.py | 300 |
1 files changed, 157 insertions, 143 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py index 1db9ea5ba..1e4b8ac2b 100644 --- a/module/ThreadManager.py +++ b/module/ThreadManager.py @@ -30,152 +30,166 @@ import PluginThread ######################################################################## class ThreadManager: - """manages the download threads, assign jobs, reconnect etc""" + """manages the download threads, assign jobs, reconnect etc""" - #---------------------------------------------------------------------- - def __init__(self, core): - """Constructor""" - self.core = core - self.log = core.log - - self.threads = [] # thread list - self.localThreads = [] #hook+decrypter threads - - self.pause = True - - self.reconnecting = Event() - self.reconnecting.clear() - - for i in range(0, self.core.config.get("general","max_downloads") ): - self.createThread() - - - - #---------------------------------------------------------------------- - def createThread(self): - """create a download thread""" - - thread = PluginThread.DownloadThread(self) - self.threads.append(thread) - - #---------------------------------------------------------------------- - def downloadingIds(self): - """get a list of the currently downloading pyfile's ids""" - return [x.active.id for x in self.threads if x.active and x.active != "quit"] - - #---------------------------------------------------------------------- - def processingIds(self): - """get a id list of all pyfiles processed""" - return [x.active.id for x in self.threads+self.localThreads if x.active and x.active != "quit"] - - - #---------------------------------------------------------------------- - def work(self): - """run all task which have to be done (this is for repetivive call by core)""" - - self.tryReconnect() - self.checkThreadCount() - self.assignJob() - - #---------------------------------------------------------------------- - def tryReconnect(self): - """checks if reconnect needed""" - - if not (self.core.server_methods.is_time_reconnect() and self.core.config["reconnect"]["activated"] ): - return False - - active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] + #---------------------------------------------------------------------- + def __init__(self, core): + """Constructor""" + self.core = core + self.log = core.log + + self.threads = [] # thread list + self.localThreads = [] #hook+decrypter threads + + self.infoThread = PluginThread.InfoThread(self) + + self.pause = True + + self.reconnecting = Event() + self.reconnecting.clear() + + for i in range(0, self.core.config.get("general","max_downloads") ): + self.createThread() + + + + #---------------------------------------------------------------------- + def createThread(self): + """create a download thread""" + + thread = PluginThread.DownloadThread(self) + self.threads.append(thread) + + #---------------------------------------------------------------------- + def downloadingIds(self): + """get a list of the currently downloading pyfile's ids""" + return [x.active.id for x in self.threads if x.active and x.active != "quit"] + + #---------------------------------------------------------------------- + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.active.id for x in self.threads+self.localThreads if x.active and x.active != "quit"] + + + #---------------------------------------------------------------------- + def work(self): + """run all task which have to be done (this is for repetivive call by core)""" + + self.tryReconnect() + self.checkThreadCount() + self.assignJob() + + #---------------------------------------------------------------------- + def tryReconnect(self): + """checks if reconnect needed""" + + if not (self.core.server_methods.is_time_reconnect() and self.core.config["reconnect"]["activated"] ): + return False + + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - if active.count(True) > 0 and len(active) == active.count(True): - - if not exists(self.core.config['reconnect']['method']): - if exists(join(pypath, self.core.config['reconnect']['method'])): - self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) - else: - self.core.config["reconnect"]["activated"] = False - self.log.warning(_("Reconnect script not found!")) - return - - - self.reconnecting.set() - - #Do reconnect - self.log.info(_("Starting reconnect")) + if active.count(True) > 0 and len(active) == active.count(True): + + if not exists(self.core.config['reconnect']['method']): + if exists(join(pypath, self.core.config['reconnect']['method'])): + self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) + else: + self.core.config["reconnect"]["activated"] = False + self.log.warning(_("Reconnect script not found!")) + return + + + self.reconnecting.set() + + #Do reconnect + self.log.info(_("Starting reconnect")) - - while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: - sleep(0.25) - - - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) - - self.core.hookManager.beforeReconnecting(ip) - reconn = Popen(self.core.config['reconnect']['method'])#, stdout=subprocess.PIPE) - reconn.wait() - sleep(1) - ip = "" - while ip == "": - try: - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip - except: - ip = "" - sleep(1) - self.core.hookManager.afterReconnecting(ip) - - self.log.info(_("Reconnected, new IP: %s") % ip) - - - self.reconnecting.clear() - - #---------------------------------------------------------------------- - def checkThreadCount(self): - """checks if there are need for increasing or reducing thread count""" - - if len(self.threads) == self.core.config.get("general", "max_downloads"): - return True - elif len(self.threads) < self.core.config.get("general", "max_downloads"): - self.createThread() - else: - #@TODO: close thread - pass - - - #---------------------------------------------------------------------- - def assignJob(self): - """assing a job to a thread if possible""" - - if self.pause: return - - free = [x for x in self.threads if not x.active] + + while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: + sleep(0.25) + + + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) + + self.core.hookManager.beforeReconnecting(ip) + reconn = Popen(self.core.config['reconnect']['method'])#, stdout=subprocess.PIPE) + reconn.wait() + sleep(1) + ip = "" + while ip == "": + try: + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip + except: + ip = "" + sleep(1) + self.core.hookManager.afterReconnecting(ip) + + self.log.info(_("Reconnected, new IP: %s") % ip) + + + self.reconnecting.clear() + + #---------------------------------------------------------------------- + def checkThreadCount(self): + """checks if there are need for increasing or reducing thread count""" + + if len(self.threads) == self.core.config.get("general", "max_downloads"): + return True + elif len(self.threads) < self.core.config.get("general", "max_downloads"): + self.createThread() + else: + #@TODO: close thread + pass + + + #---------------------------------------------------------------------- + def assignJob(self): + """assing a job to a thread if possible""" + + if self.pause: return + + free = [x for x in self.threads if not x.active] - - occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ] - occ.sort() - occ = tuple(set(occ)) - job = self.core.files.getJob(occ) - if job: - try: - job.initPlugin() - except Exception, e: - self.log.critical(str(e)) - if self.core.debug: - print_exc() - - if job.plugin.__type__ == "hoster": - if free: - thread = free[0] - thread.put(job) - else: - #put job back - self.core.files.jobCache[occ].append(job.id) - - else: - thread = PluginThread.DecrypterThread(self, job) - - - - - + + occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ] + occ.sort() + occ = tuple(set(occ)) + job = self.core.files.getJob(occ) + if job: + try: + job.initPlugin() + except Exception, e: + self.log.critical(str(e)) + if self.core.debug: + print_exc() + + if job.plugin.__type__ == "hoster": + if free: + thread = free[0] + thread.put(job) + else: + #put job back + self.core.files.jobCache[occ].append(job.id) + + else: + thread = PluginThread.DecrypterThread(self, job) + + job = self.core.files.getInfoJob() + if job: + try: + job.initPlugin() + except Exception, e: + self.log.critical(str(e)) + if self.core.debug: + print_exc() + + if job.plugin.__type__ == "hoster": + self.infoThread.put(job) + + + + + + - |