diff options
| author | 2010-08-07 17:40:43 +0200 | |
|---|---|---|
| committer | 2010-08-07 17:40:43 +0200 | |
| commit | afb5e3371a9b43dff97131440affcc2c68ec5593 (patch) | |
| tree | 7d2c9f1b8a016fc115881d607fcdeb2c12b30703 /module/ThreadManager.py | |
| parent | hook improvements (diff) | |
| download | pyload-afb5e3371a9b43dff97131440affcc2c68ec5593.tar.xz | |
file info prefetching (RapidshareCom UploadedTo), download folder fix, SerienjunkiesOrg fix
Diffstat (limited to 'module/ThreadManager.py')
| -rw-r--r-- | module/ThreadManager.py | 300 | 
1 files changed, 157 insertions, 143 deletions
diff --git a/module/ThreadManager.py b/module/ThreadManager.py index 1db9ea5ba..1e4b8ac2b 100644 --- a/module/ThreadManager.py +++ b/module/ThreadManager.py @@ -30,152 +30,166 @@ import PluginThread  ########################################################################  class ThreadManager: -	"""manages the download threads, assign jobs, reconnect etc""" +    """manages the download threads, assign jobs, reconnect etc""" -	#---------------------------------------------------------------------- -	def __init__(self, core): -		"""Constructor""" -		self.core = core -		self.log = core.log -				 -		self.threads = []  # thread list -		self.localThreads = []  #hook+decrypter threads -		 -		self.pause = True -		 -		self.reconnecting = Event() -		self.reconnecting.clear() -		 -		for i in range(0, self.core.config.get("general","max_downloads") ): -			self.createThread() -		 -		 -		 -	#---------------------------------------------------------------------- -	def createThread(self): -		"""create a download thread""" -		 -		thread = PluginThread.DownloadThread(self)		 -		self.threads.append(thread) -		 -	#---------------------------------------------------------------------- -	def downloadingIds(self): -		"""get a list of the currently downloading pyfile's ids""" -		return [x.active.id for x in self.threads if x.active and x.active != "quit"] -	 -	#---------------------------------------------------------------------- -	def processingIds(self): -		"""get a id list of all pyfiles processed""" -		return [x.active.id for x in self.threads+self.localThreads if x.active and x.active != "quit"] -		 -		 -	#---------------------------------------------------------------------- -	def work(self): -		"""run all task which have to be done (this is for repetivive call by core)""" -				 -		self.tryReconnect() -		self.checkThreadCount() -		self.assignJob() -	 -	#---------------------------------------------------------------------- -	def tryReconnect(self): -		"""checks if reconnect needed""" -		 -		if not (self.core.server_methods.is_time_reconnect() and self.core.config["reconnect"]["activated"] ): -			return False -						 -		active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] +    #---------------------------------------------------------------------- +    def __init__(self, core): +        """Constructor""" +        self.core = core +        self.log = core.log +                 +        self.threads = []  # thread list +        self.localThreads = []  #hook+decrypter threads +         +        self.infoThread = PluginThread.InfoThread(self) +                 +        self.pause = True +         +        self.reconnecting = Event() +        self.reconnecting.clear() +         +        for i in range(0, self.core.config.get("general","max_downloads") ): +            self.createThread() +         +         +         +    #---------------------------------------------------------------------- +    def createThread(self): +        """create a download thread""" +         +        thread = PluginThread.DownloadThread(self)         +        self.threads.append(thread) +         +    #---------------------------------------------------------------------- +    def downloadingIds(self): +        """get a list of the currently downloading pyfile's ids""" +        return [x.active.id for x in self.threads if x.active and x.active != "quit"] +     +    #---------------------------------------------------------------------- +    def processingIds(self): +        """get a id list of all pyfiles processed""" +        return [x.active.id for x in self.threads+self.localThreads if x.active and x.active != "quit"] +         +         +    #---------------------------------------------------------------------- +    def work(self): +        """run all task which have to be done (this is for repetivive call by core)""" +                 +        self.tryReconnect() +        self.checkThreadCount() +        self.assignJob() +     +    #---------------------------------------------------------------------- +    def tryReconnect(self): +        """checks if reconnect needed""" +         +        if not (self.core.server_methods.is_time_reconnect() and self.core.config["reconnect"]["activated"] ): +            return False +                         +        active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] -		if active.count(True) > 0 and len(active) == active.count(True): -		 -			if not exists(self.core.config['reconnect']['method']): -				if exists(join(pypath, self.core.config['reconnect']['method'])): -					self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) -				else: -					self.core.config["reconnect"]["activated"] = False -					self.log.warning(_("Reconnect script not found!")) -					return -				 -				 -			self.reconnecting.set() -			 -			#Do reconnect -			self.log.info(_("Starting reconnect")) +        if active.count(True) > 0 and len(active) == active.count(True): +         +            if not exists(self.core.config['reconnect']['method']): +                if exists(join(pypath, self.core.config['reconnect']['method'])): +                    self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method']) +                else: +                    self.core.config["reconnect"]["activated"] = False +                    self.log.warning(_("Reconnect script not found!")) +                    return +                 +                 +            self.reconnecting.set() +             +            #Do reconnect +            self.log.info(_("Starting reconnect")) -			 -			while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: -				sleep(0.25) -				 -						 -			ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) -			 -			self.core.hookManager.beforeReconnecting(ip) -			reconn = Popen(self.core.config['reconnect']['method'])#, stdout=subprocess.PIPE) -			reconn.wait() -			sleep(1) -			ip = "" -			while ip == "": -					try: -							ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip -					except: -							ip = "" -					sleep(1) -			self.core.hookManager.afterReconnecting(ip) -			 -			self.log.info(_("Reconnected, new IP: %s") % ip) -	 -					 -			self.reconnecting.clear() -	 -	#---------------------------------------------------------------------- -	def checkThreadCount(self): -		"""checks if there are need for increasing or reducing thread count""" -		 -		if len(self.threads) == self.core.config.get("general", "max_downloads"): -			return True -		elif len(self.threads) < self.core.config.get("general", "max_downloads"): -			self.createThread() -		else: -			#@TODO: close thread -			pass -		 -	 -	#---------------------------------------------------------------------- -	def assignJob(self): -		"""assing a job to a thread if possible""" -		 -		if self.pause: return -		 -		free = [x for x in self.threads if not x.active] +             +            while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: +                sleep(0.25) +                 +                         +            ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) +             +            self.core.hookManager.beforeReconnecting(ip) +            reconn = Popen(self.core.config['reconnect']['method'])#, stdout=subprocess.PIPE) +            reconn.wait() +            sleep(1) +            ip = "" +            while ip == "": +                    try: +                            ip = re.match(".*Current IP Address: (.*)</body>.*", getURL("http://checkip.dyndns.org/")).group(1) #get new ip +                    except: +                            ip = "" +                    sleep(1) +            self.core.hookManager.afterReconnecting(ip) +             +            self.log.info(_("Reconnected, new IP: %s") % ip) +     +                     +            self.reconnecting.clear() +     +    #---------------------------------------------------------------------- +    def checkThreadCount(self): +        """checks if there are need for increasing or reducing thread count""" +         +        if len(self.threads) == self.core.config.get("general", "max_downloads"): +            return True +        elif len(self.threads) < self.core.config.get("general", "max_downloads"): +            self.createThread() +        else: +            #@TODO: close thread +            pass +         +     +    #---------------------------------------------------------------------- +    def assignJob(self): +        """assing a job to a thread if possible""" +         +        if self.pause: return +         +        free = [x for x in self.threads if not x.active] -		 -		occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ] -		occ.sort() -		occ = tuple(set(occ)) -		job = self.core.files.getJob(occ) -		if job: -			try: -				job.initPlugin() -			except Exception, e: -				self.log.critical(str(e)) -				if self.core.debug: -				    print_exc() -			 -			if job.plugin.__type__ == "hoster": -				if free: -					thread = free[0] -					thread.put(job) -				else: -					#put job back -					self.core.files.jobCache[occ].append(job.id) -					 -			else: -				thread = PluginThread.DecrypterThread(self, job) -					 -	 -		 -		 -		 +         +        occ = [x.active.pluginname for x in self.threads if x.active and not x.active.plugin.multiDL ] +        occ.sort() +        occ = tuple(set(occ)) +        job = self.core.files.getJob(occ) +        if job: +            try: +                job.initPlugin() +            except Exception, e: +                self.log.critical(str(e)) +                if self.core.debug: +                    print_exc() +             +            if job.plugin.__type__ == "hoster": +                if free: +                    thread = free[0] +                    thread.put(job) +                else: +                    #put job back +                    self.core.files.jobCache[occ].append(job.id) +                     +            else: +                thread = PluginThread.DecrypterThread(self, job) +         +        job = self.core.files.getInfoJob() +        if job: +            try: +                job.initPlugin() +            except Exception, e: +                self.log.critical(str(e)) +                if self.core.debug: +                    print_exc() +             +            if job.plugin.__type__ == "hoster": +                self.infoThread.put(job) +                     +     +         +         +         +     -	  | 
