summaryrefslogtreecommitdiffstats
path: root/pyload/threads
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/threads')
-rw-r--r--pyload/threads/DecrypterThread.py1
-rw-r--r--pyload/threads/DownloadThread.py19
-rw-r--r--pyload/threads/ThreadManager.py220
3 files changed, 24 insertions, 216 deletions
diff --git a/pyload/threads/DecrypterThread.py b/pyload/threads/DecrypterThread.py
index 9f796da22..419f153a2 100644
--- a/pyload/threads/DecrypterThread.py
+++ b/pyload/threads/DecrypterThread.py
@@ -23,7 +23,6 @@ class DecrypterThread(BaseThread):
# holds the progress, while running
self.progress = None
- self.m.addThread(self)
self.start()
def getProgress(self):
diff --git a/pyload/threads/DownloadThread.py b/pyload/threads/DownloadThread.py
index d1672531b..b8f7e4965 100644
--- a/pyload/threads/DownloadThread.py
+++ b/pyload/threads/DownloadThread.py
@@ -18,6 +18,7 @@
@author: RaNaN
"""
+from threading import Event
from Queue import Queue
from time import sleep, time
from traceback import print_exc
@@ -37,6 +38,9 @@ class DownloadThread(BaseThread):
"""Constructor"""
BaseThread.__init__(self, manager)
+ self.isWorking = Event()
+ self.isWorking.clear()
+
self.queue = Queue() # job queue
self.active = None
@@ -53,12 +57,19 @@ class DownloadThread(BaseThread):
if self.active == "quit":
self.active = None
- self.m.threads.remove(self)
+ self.m.stop(self)
return True
try:
- if not pyfile.hasPlugin(): continue
+ pyfile.initPlugin()
+
+ # after initialization the thread is fully ready
+ self.isWorking.set()
+
#this pyfile was deleted while queuing
+ # TODO: what will happen with new thread manager?
+ #if not pyfile.hasPlugin(): continue
+
pyfile.plugin.checkForSameFiles(starting=True)
self.log.info(_("Download starts: %s" % pyfile.name))
@@ -204,7 +215,9 @@ class DownloadThread(BaseThread):
self.core.files.save()
pyfile.checkIfProcessed()
exc_clear()
-
+ # manager could still be waiting for it
+ self.isWorking.set()
+ self.m.done(self)
#pyfile.plugin.req.clean()
diff --git a/pyload/threads/ThreadManager.py b/pyload/threads/ThreadManager.py
index 298b0402d..f6cb3daea 100644
--- a/pyload/threads/ThreadManager.py
+++ b/pyload/threads/ThreadManager.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
###############################################################################
-# Copyright(c) 2008-2013 pyLoad Team
+# Copyright(c) 2008-2014 pyLoad Team
# http://www.pyload.org
#
# This file is part of pyLoad.
@@ -16,22 +16,11 @@
# @author: RaNaN
###############################################################################
-from os.path import exists, join
-import re
-from subprocess import Popen
-from threading import Event, RLock
-from time import sleep, time
-from traceback import print_exc
-from random import choice
+from threading import RLock
+from time import time
-from pyload.datatypes.PyFile import PyFile
from pyload.datatypes.OnlineCheck import OnlineCheck
-from pyload.network.RequestFactory import getURL
-from pyload.utils import lock, uniqify, to_list
-from pyload.utils.fs import free_space
-
-from DecrypterThread import DecrypterThread
-from DownloadThread import DownloadThread
+from pyload.utils import lock, to_list
from InfoThread import InfoThread
@@ -44,13 +33,6 @@ class ThreadManager:
self.log = core.log
self.threads = [] # thread list
- self.localThreads = [] #addon+decrypter threads
-
- self.pause = True
-
- self.reconnecting = Event()
- self.reconnecting.clear()
- self.downloaded = 0 #number of files downloaded since last cleanup
self.lock = RLock()
@@ -67,24 +49,15 @@ class ThreadManager:
# timeout for cache purge
self.timestamp = 0
- for i in range(self.core.config.get("download", "max_downloads")):
- self.createThread()
-
- def createThread(self):
- """create a download thread"""
-
- thread = DownloadThread(self)
- self.threads.append(thread)
-
@lock
def addThread(self, thread):
- self.localThreads.append(thread)
+ self.threads.append(thread)
@lock
def removeThread(self, thread):
""" Remove a thread from the local list """
- if thread in self.localThreads:
- self.localThreads.remove(thread)
+ if thread in self.threads:
+ self.threads.remove(thread)
@lock
def createInfoThread(self, data, pid):
@@ -108,11 +81,6 @@ class ThreadManager:
return rid
@lock
- def createDecryptThread(self, data, pid):
- """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
- if data: DecrypterThread(self, data, pid)
-
- @lock
def getInfoResult(self, rid):
return self.infoResults.get(rid)
@@ -120,14 +88,10 @@ class ThreadManager:
self.core.evm.dispatchEvent("linkcheck:updated", oc.rid, result, owner=oc.owner)
oc.update(result)
- def getActiveDownloads(self, user=None):
- # TODO: user context
- return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)]
-
def getProgressList(self, user=None):
info = []
- for thread in self.threads + self.localThreads:
+ for thread in self.threads:
# skip if not belong to current user
if user is not None and thread.owner != user: continue
@@ -136,38 +100,8 @@ class ThreadManager:
return info
- def getActiveFiles(self):
- active = self.getActiveDownloads()
-
- for t in self.localThreads:
- active.extend(t.getActiveFiles())
-
- return active
-
- def processingIds(self):
- """get a id list of all pyfiles processed"""
- return [x.id for x in self.getActiveFiles()]
-
def work(self):
"""run all task which have to be done (this is for repetitive call by core)"""
- try:
- self.tryReconnect()
- except Exception, e:
- self.log.error(_("Reconnect Failed: %s") % str(e))
- self.reconnecting.clear()
- self.core.print_exc()
-
- self.checkThreadCount()
-
- try:
- self.assignJob()
- except Exception, e:
- self.log.warning("Assign job error", e)
- self.core.print_exc()
-
- sleep(0.5)
- self.assignJob()
- #it may be failed non critical so we try it again
if self.infoCache and self.timestamp < time():
self.infoCache.clear()
@@ -176,141 +110,3 @@ class ThreadManager:
for rid in self.infoResults.keys():
if self.infoResults[rid].isStale():
del self.infoResults[rid]
-
- def tryReconnect(self):
- """checks if reconnect needed"""
-
- if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()):
- return False
-
- active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active]
-
- if not (0 < active.count(True) == len(active)):
- return False
-
- if not exists(self.core.config['reconnect']['method']):
- if exists(join(pypath, self.core.config['reconnect']['method'])):
- self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
- else:
- self.core.config["reconnect"]["activated"] = False
- self.log.warning(_("Reconnect script not found!"))
- return
-
- self.reconnecting.set()
-
- #Do reconnect
- self.log.info(_("Starting reconnect"))
-
- while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0:
- sleep(0.25)
-
- ip = self.getIP()
-
- self.core.evm.dispatchEvent("reconnect:before", ip)
-
- self.log.debug("Old IP: %s" % ip)
-
- try:
- reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE)
- except:
- self.log.warning(_("Failed executing reconnect script!"))
- self.core.config["reconnect"]["activated"] = False
- self.reconnecting.clear()
- self.core.print_exc()
- return
-
- reconn.wait()
- sleep(1)
- ip = self.getIP()
- self.core.evm.dispatchEvent("reconnect:after", ip)
-
- self.log.info(_("Reconnected, new IP: %s") % ip)
-
- self.reconnecting.clear()
-
- def getIP(self):
- """retrieve current ip"""
- services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"),
- ("http://checkip.dyndns.org/", ".*Current IP Address: (\S+)</body>.*")]
-
- ip = ""
- for i in range(10):
- try:
- sv = choice(services)
- ip = getURL(sv[0])
- ip = re.match(sv[1], ip).group(1)
- break
- except:
- ip = ""
- sleep(1)
-
- return ip
-
- def checkThreadCount(self):
- """checks if there is a need for increasing or reducing thread count"""
-
- if len(self.threads) == self.core.config.get("download", "max_downloads"):
- return True
- elif len(self.threads) < self.core.config.get("download", "max_downloads"):
- self.createThread()
- else:
- free = [x for x in self.threads if not x.active]
- if free:
- free[0].put("quit")
-
-
- def cleanPycurl(self):
- """ make a global curl cleanup (currently unused) """
- if self.processingIds():
- return False
- import pycurl
-
- pycurl.global_cleanup()
- pycurl.global_init(pycurl.GLOBAL_DEFAULT)
- self.downloaded = 0
- self.log.debug("Cleaned up pycurl")
- return True
-
-
- def assignJob(self):
- """assign a job to a thread if possible"""
-
- if self.pause or not self.core.api.isTimeDownload(): return
-
- #if self.downloaded > 20:
- # if not self.cleanPyCurl(): return
-
- free = [x for x in self.threads if not x.active]
-
- inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if
- x.active and x.active.hasPlugin()]
- inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in
- inuse]
- occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]])))
-
- job = self.core.files.getJob(occ)
- if job:
- try:
- job.initPlugin()
- except Exception, e:
- self.log.critical(str(e))
- print_exc()
- job.setStatus("failed")
- job.error = str(e)
- job.release()
- return
-
- spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024
- if spaceLeft < self.core.config["general"]["min_free_space"]:
- self.log.warning(_("Not enough space left on device"))
- self.pause = True
-
- if free and not self.pause:
- thread = free[0]
- #self.downloaded += 1
- thread.put(job)
- else:
- #put job back
- if occ not in self.core.files.jobCache:
- self.core.files.jobCache[occ] = []
- self.core.files.jobCache[occ].append(job.id)