summaryrefslogtreecommitdiffstats
path: root/pyload/threads/ThreadManager.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/threads/ThreadManager.py')
-rw-r--r--pyload/threads/ThreadManager.py220
1 files changed, 8 insertions, 212 deletions
diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py
index 298b0402d..f6cb3daea 100644
--- a/pyload/threads/ThreadManager.py
+++ b/pyload/threads/ThreadManager.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
###############################################################################
-# Copyright(c) 2008-2013 pyLoad Team
+# Copyright(c) 2008-2014 pyLoad Team
# http://www.pyload.org
#
# This file is part of pyLoad.
@@ -16,22 +16,11 @@
# @author: RaNaN
###############################################################################
-from os.path import exists, join
-import re
-from subprocess import Popen
-from threading import Event, RLock
-from time import sleep, time
-from traceback import print_exc
-from random import choice
+from threading import RLock
+from time import time
-from pyload.datatypes.PyFile import PyFile
from pyload.datatypes.OnlineCheck import OnlineCheck
-from pyload.network.RequestFactory import getURL
-from pyload.utils import lock, uniqify, to_list
-from pyload.utils.fs import free_space
-
-from DecrypterThread import DecrypterThread
-from DownloadThread import DownloadThread
+from pyload.utils import lock, to_list
from InfoThread import InfoThread
@@ -44,13 +33,6 @@ class ThreadManager:
self.log = core.log
self.threads = [] # thread list
- self.localThreads = [] #addon+decrypter threads
-
- self.pause = True
-
- self.reconnecting = Event()
- self.reconnecting.clear()
- self.downloaded = 0 #number of files downloaded since last cleanup
self.lock = RLock()
@@ -67,24 +49,15 @@ class ThreadManager:
# timeout for cache purge
self.timestamp = 0
- for i in range(self.core.config.get("download", "max_downloads")):
- self.createThread()
-
- def createThread(self):
- """create a download thread"""
-
- thread = DownloadThread(self)
- self.threads.append(thread)
-
@lock
def addThread(self, thread):
- self.localThreads.append(thread)
+ self.threads.append(thread)
@lock
def removeThread(self, thread):
""" Remove a thread from the local list """
- if thread in self.localThreads:
- self.localThreads.remove(thread)
+ if thread in self.threads:
+ self.threads.remove(thread)
@lock
def createInfoThread(self, data, pid):
@@ -108,11 +81,6 @@ class ThreadManager:
return rid
@lock
- def createDecryptThread(self, data, pid):
- """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
- if data: DecrypterThread(self, data, pid)
-
- @lock
def getInfoResult(self, rid):
return self.infoResults.get(rid)
@@ -120,14 +88,10 @@ class ThreadManager:
self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner)
oc.update(result)
- def getActiveDownloads(self, user=None):
- # TODO: user context
- return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)]
-
def getProgressList(self, user=None):
info = []
- for thread in self.threads + self.localThreads:
+ for thread in self.threads:
# skip if not belong to current user
if user is not None and thread.owner != user: continue
@@ -136,38 +100,8 @@ class ThreadManager:
return info
- def getActiveFiles(self):
- active = self.getActiveDownloads()
-
- for t in self.localThreads:
- active.extend(t.getActiveFiles())
-
- return active
-
- def processingIds(self):
- """get a id list of all pyfiles processed"""
- return [x.id for x in self.getActiveFiles()]
-
def work(self):
"""run all task which have to be done (this is for repetitive call by core)"""
- try:
- self.tryReconnect()
- except Exception, e:
- self.log.error(_("Reconnect Failed: %s") % str(e))
- self.reconnecting.clear()
- self.core.print_exc()
-
- self.checkThreadCount()
-
- try:
- self.assignJob()
- except Exception, e:
- self.log.warning("Assign job error", e)
- self.core.print_exc()
-
- sleep(0.5)
- self.assignJob()
- #it may be failed non critical so we try it again
if self.infoCache and self.timestamp < time():
self.infoCache.clear()
@@ -176,141 +110,3 @@ class ThreadManager:
for rid in self.infoResults.keys():
if self.infoResults[rid].isStale():
del self.infoResults[rid]
-
- def tryReconnect(self):
- """checks if reconnect needed"""
-
- if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()):
- return False
-
- active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active]
-
- if not (0 < active.count(True) == len(active)):
- return False
-
- if not exists(self.core.config['reconnect']['method']):
- if exists(join(pypath, self.core.config['reconnect']['method'])):
- self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
- else:
- self.core.config["reconnect"]["activated"] = False
- self.log.warning(_("Reconnect script not found!"))
- return
-
- self.reconnecting.set()
-
- #Do reconnect
- self.log.info(_("Starting reconnect"))
-
- while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0:
- sleep(0.25)
-
- ip = self.getIP()
-
- self.core.evm.dispatchEvent("reconnect:before", ip)
-
- self.log.debug("Old IP: %s" % ip)
-
- try:
- reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE)
- except:
- self.log.warning(_("Failed executing reconnect script!"))
- self.core.config["reconnect"]["activated"] = False
- self.reconnecting.clear()
- self.core.print_exc()
- return
-
- reconn.wait()
- sleep(1)
- ip = self.getIP()
- self.core.evm.dispatchEvent("reconnect:after", ip)
-
- self.log.info(_("Reconnected, new IP: %s") % ip)
-
- self.reconnecting.clear()
-
- def getIP(self):
- """retrieve current ip"""
- services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"),
- ("http://checkip.dyndns.org/", ".*Current IP Address: (\S+)</body>.*")]
-
- ip = ""
- for i in range(10):
- try:
- sv = choice(services)
- ip = getURL(sv[0])
- ip = re.match(sv[1], ip).group(1)
- break
- except:
- ip = ""
- sleep(1)
-
- return ip
-
- def checkThreadCount(self):
- """checks if there is a need for increasing or reducing thread count"""
-
- if len(self.threads) == self.core.config.get("download", "max_downloads"):
- return True
- elif len(self.threads) < self.core.config.get("download", "max_downloads"):
- self.createThread()
- else:
- free = [x for x in self.threads if not x.active]
- if free:
- free[0].put("quit")
-
-
- def cleanPycurl(self):
- """ make a global curl cleanup (currently unused) """
- if self.processingIds():
- return False
- import pycurl
-
- pycurl.global_cleanup()
- pycurl.global_init(pycurl.GLOBAL_DEFAULT)
- self.downloaded = 0
- self.log.debug("Cleaned up pycurl")
- return True
-
-
- def assignJob(self):
- """assign a job to a thread if possible"""
-
- if self.pause or not self.core.api.isTimeDownload(): return
-
- #if self.downloaded > 20:
- # if not self.cleanPyCurl(): return
-
- free = [x for x in self.threads if not x.active]
-
- inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if
- x.active and x.active.hasPlugin()]
- inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in
- inuse]
- occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]])))
-
- job = self.core.files.getJob(occ)
- if job:
- try:
- job.initPlugin()
- except Exception, e:
- self.log.critical(str(e))
- print_exc()
- job.setStatus("failed")
- job.error = str(e)
- job.release()
- return
-
- spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024
- if spaceLeft < self.core.config["general"]["min_free_space"]:
- self.log.warning(_("Not enough space left on device"))
- self.pause = True
-
- if free and not self.pause:
- thread = free[0]
- #self.downloaded += 1
- thread.put(job)
- else:
- #put job back
- if occ not in self.core.files.jobCache:
- self.core.files.jobCache[occ] = []
- self.core.files.jobCache[occ].append(job.id)