diff options
Diffstat (limited to 'module/ThreadManager.py')
-rw-r--r-- | module/ThreadManager.py | 151 |
1 files changed, 78 insertions, 73 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py index b9c407484..a482ff311 100644 --- a/module/ThreadManager.py +++ b/module/ThreadManager.py @@ -37,131 +37,136 @@ class ThreadManager: """Constructor""" self.core = core self.log = core.log - + self.threads = [] # thread list self.localThreads = [] #hook+decrypter threads - + #self.infoThread = PluginThread.InfoThread(self) - + self.pause = True - + self.reconnecting = Event() self.reconnecting.clear() - + for i in range(0, self.core.config.get("general", "max_downloads")): self.createThread() - - - + + + #---------------------------------------------------------------------- def createThread(self): """create a download thread""" - - thread = PluginThread.DownloadThread(self) + + thread = PluginThread.DownloadThread(self) self.threads.append(thread) - + #---------------------------------------------------------------------- def createInfoThread(self, data, pid): """ start a thread whichs fetches online status and other infos data = [ .. () .. ] """ - + PluginThread.InfoThread(self, data, pid) - - + + #---------------------------------------------------------------------- def downloadingIds(self): """get a list of the currently downloading pyfile's ids""" return [x.active.id for x in self.threads if x.active and x.active != "quit"] - + #---------------------------------------------------------------------- def processingIds(self): """get a id list of all pyfiles processed""" return [x.active.id for x in self.threads + self.localThreads if x.active and x.active != "quit"] - - + + #---------------------------------------------------------------------- def work(self): """run all task which have to be done (this is for repetivive call by core)""" - + self.tryReconnect() self.checkThreadCount() self.assignJob() - + #---------------------------------------------------------------------- def tryReconnect(self): """checks if reconnect needed""" - - if not (self.core.server_methods.is_time_reconnect() and self.core.config["reconnect"]["activated"]): + + if not (self.core.config["reconnect"]["activated"] and self.core.server_methods.is_time_reconnect()): return False - + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] - if active.count(True) > 0 and len(active) == active.count(True): - - if not exists(self.core.config['reconnect']['method']): - if exists(join(pypath, self.core.config['reconnect']['method'])): - self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) - else: - self.core.config["reconnect"]["activated"] = False - self.log.warning(_("Reconnect script not found!")) - return - - - self.reconnecting.set() - - #Do reconnect - self.log.info(_("Starting reconnect")) - - - while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: - sleep(0.25) - - - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) - - self.core.hookManager.beforeReconnecting(ip) - reconn = Popen(self.core.config['reconnect']['method'])#, stdout=subprocess.PIPE) - reconn.wait() - sleep(1) - ip = "" - while ip == "": - try: - ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip - except: - ip = "" - sleep(1) - self.core.hookManager.afterReconnecting(ip) - - self.log.info(_("Reconnected, new IP: %s") % ip) - - + if not (active.count(True) > 0 and len(active) == active.count(True)): + return False + + if not exists(self.core.config['reconnect']['method']): + if exists(join(pypath, self.core.config['reconnect']['method'])): + self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) + else: + self.core.config["reconnect"]["activated"] = False + self.log.warning(_("Reconnect script not found!")) + return + + self.reconnecting.set() + + #Do reconnect + self.log.info(_("Starting reconnect")) + + while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: + sleep(0.25) + + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) + + self.core.hookManager.beforeReconnecting(ip) + + self.log.debug(_("Old IP: %s") % ip) + + try: + reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1)#, stdout=subprocess.PIPE) + except: + self.log.warning(_("Failed executing reconnect script!")) + self.core.config["reconnect"]["activated"] = False self.reconnecting.clear() - + return + + reconn.wait() + sleep(1) + ip = "" + while ip == "": + try: + ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip + except: + ip = "" + sleep(1) + self.core.hookManager.afterReconnecting(ip) + + self.log.info(_("Reconnected, new IP: %s") % ip) + + self.reconnecting.clear() + #---------------------------------------------------------------------- def checkThreadCount(self): """checks if there are need for increasing or reducing thread count""" - + if len(self.threads) == self.core.config.get("general", "max_downloads"): return True elif len(self.threads) < self.core.config.get("general", "max_downloads"): self.createThread() else: - #@TODO: close thread + #@TODO: close thread pass - - + + #---------------------------------------------------------------------- def assignJob(self): """assing a job to a thread if possible""" - + if self.pause or not self.core.server_methods.is_time_download(): return - - free = [x for x in self.threads if not x.active] + free = [x for x in self.threads if not x.active] - occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL] occ.sort() occ = tuple(set(occ)) @@ -173,17 +178,17 @@ class ThreadManager: self.log.critical(str(e)) if self.core.debug: print_exc() - + if job.plugin.__type__ == "hoster": if free: thread = free[0] thread.put(job) else: - #put job back + #put job back if not self.core.files.jobCache.has_key(occ): self.core.files.jobCache[occ] = [] self.core.files.jobCache[occ].append(job.id) - + else: thread = PluginThread.DecrypterThread(self, job) |