summaryrefslogtreecommitdiffstats
path: root/module/threads
diff options
context:
space:
mode:
Diffstat (limited to 'module/threads')
-rw-r--r--module/threads/BaseThread.py136
-rw-r--r--module/threads/DecrypterThread.py80
-rw-r--r--module/threads/DownloadThread.py215
-rw-r--r--module/threads/HookThread.py65
-rw-r--r--module/threads/InfoThread.py168
-rw-r--r--module/threads/ThreadManager.py300
-rw-r--r--module/threads/__init__.py0
7 files changed, 964 insertions, 0 deletions
diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py
new file mode 100644
index 000000000..f6fac46a0
--- /dev/null
+++ b/module/threads/BaseThread.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import os
+import sys
+import locale
+
+from threading import Thread
+from time import strftime, gmtime
+from sys import exc_info
+from types import MethodType
+from pprint import pformat
+from traceback import format_exc
+
+from module.utils.fs import listdir, join, save_join, stat, exists
+
+class BaseThread(Thread):
+ """abstract base class for thread types"""
+
+ def __init__(self, manager):
+ Thread.__init__(self)
+ self.setDaemon(True)
+ self.m = manager #thread manager
+ self.core = manager.core
+ self.log = manager.core.log
+
+ def writeDebugReport(self, name, pyfile=None, plugin=None):
+ """ writes a debug report to disk """
+
+ dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S"))
+ if pyfile:
+ dump = self.getFileDump(pyfile)
+ else:
+ dump = self.getPluginDump(plugin)
+
+ try:
+ import zipfile
+
+ zip = zipfile.ZipFile(dump_name, "w")
+
+ if exists(join("tmp", name)):
+ for f in listdir(join("tmp", name)):
+ try:
+ # avoid encoding errors
+ zip.write(join("tmp", name, f), save_join(name, f))
+ except:
+ pass
+
+ info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime())
+ info.external_attr = 0644 << 16L # change permissions
+ zip.writestr(info, dump)
+
+ info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime())
+ info.external_attr = 0644 << 16L
+ zip.writestr(info, self.getSystemDump())
+
+ zip.close()
+
+ if not stat(dump_name).st_size:
+ raise Exception("Empty Zipfile")
+
+ except Exception, e:
+ self.log.debug("Error creating zip file: %s" % e)
+
+ dump_name = dump_name.replace(".zip", ".txt")
+ f = open(dump_name, "wb")
+ f.write(dump)
+ f.close()
+
+ self.log.info("Debug Report written to %s" % dump_name)
+ return dump_name
+
+ def getFileDump(self, pyfile):
+ dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
+ self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc())
+
+ tb = exc_info()[2]
+ stack = []
+ while tb:
+ stack.append(tb.tb_frame)
+ tb = tb.tb_next
+
+ for frame in stack[1:]:
+ dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name,
+ frame.f_code.co_filename,
+ frame.f_lineno)
+
+ for key, value in frame.f_locals.items():
+ dump += "\t%20s = " % key
+ try:
+ dump += pformat(value) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+
+ del frame
+
+ del stack #delete it just to be sure...
+
+ dump += "\n\nPLUGIN OBJECT DUMP: \n\n"
+
+ for name in dir(pyfile.plugin):
+ attr = getattr(pyfile.plugin, name)
+ if not name.endswith("__") and type(attr) != MethodType:
+ dump += "\t%20s = " % name
+ try:
+ dump += pformat(attr) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+
+ dump += "\nPYFILE OBJECT DUMP: \n\n"
+
+ for name in dir(pyfile):
+ attr = getattr(pyfile, name)
+ if not name.endswith("__") and type(attr) != MethodType:
+ dump += "\t%20s = " % name
+ try:
+ dump += pformat(attr) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+
+ dump += "\n\nCONFIG: \n\n"
+ dump += pformat(self.m.core.config.values) + "\n"
+
+ return dump
+
+ #TODO
+ def getPluginDump(self, plugin):
+ return ""
+
+ def getSystemDump(self):
+ return ""
+
+ def clean(self, pyfile):
+ """ set thread unactive and release pyfile """
+ self.active = False
+ pyfile.release()
diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py
new file mode 100644
index 000000000..ce3c8cd83
--- /dev/null
+++ b/module/threads/DecrypterThread.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from time import sleep
+from traceback import print_exc
+
+from module.utils import uniqify
+from module.plugins.Base import Retry
+from module.plugins.Crypter import Package
+
+from BaseThread import BaseThread
+
+class DecrypterThread(BaseThread):
+ """thread for decrypting"""
+
+ def __init__(self, manager, data, pid):
+ """constructor"""
+ BaseThread.__init__(self, manager)
+ self.data = data
+ self.pid = pid
+
+ self.start()
+
+ def run(self):
+ plugin_map = {}
+ for url, plugin in self.data:
+ if plugin in plugin_map:
+ plugin_map[plugin].append(url)
+ else:
+ plugin_map[plugin] = [url]
+
+ self.decrypt(plugin_map)
+
+ def decrypt(self, plugin_map):
+ pack = self.m.core.files.getPackage(self.pid)
+ result = []
+
+ for name, urls in plugin_map.iteritems():
+ klass = self.m.core.pluginManager.loadClass("crypter", name)
+ plugin = klass(self.m.core, pack, pack.password)
+ plugin_result = []
+
+ try:
+ try:
+ plugin_result = plugin._decrypt(urls)
+ except Retry:
+ sleep(1)
+ plugin_result = plugin._decrypt(urls)
+ except Exception, e:
+ plugin.logError(_("Decrypting failed"), e)
+ if self.m.core.debug:
+ print_exc()
+ self.writeDebugReport(plugin.__name__, plugin=plugin)
+
+ plugin.logDebug("Decrypted", plugin_result)
+ result.extend(plugin_result)
+
+ result = uniqify(result)
+ pack_names = {}
+ urls = []
+
+ for p in result:
+ if isinstance(p, Package):
+ if p.name in pack_names:
+ pack_names[p.name].urls.extend(p.urls)
+ else:
+ pack_names[p.name] = p
+ else:
+ urls.append(p)
+
+ if urls:
+ self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name})
+ self.m.core.api.addFiles(self.pid, urls)
+
+ for p in pack_names.itervalues():
+ self.m.core.api.addPackage(p.name, p.urls, p.dest, pack.password)
+
+ if not result:
+ self.log.info(_("No links decrypted"))
+
diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py
new file mode 100644
index 000000000..c151831a3
--- /dev/null
+++ b/module/threads/DownloadThread.py
@@ -0,0 +1,215 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+
+ @author: RaNaN
+"""
+
+from Queue import Queue
+from time import sleep, time
+from traceback import print_exc
+from sys import exc_clear
+from pycurl import error
+
+from module.plugins.Base import Fail, Retry
+from module.plugins.Hoster import Abort, Reconnect, SkipDownload
+
+from BaseThread import BaseThread
+
+class DownloadThread(BaseThread):
+ """thread for downloading files from 'real' hoster plugins"""
+
+ def __init__(self, manager):
+ """Constructor"""
+ BaseThread.__init__(self, manager)
+
+ self.queue = Queue() # job queue
+ self.active = False
+
+ self.start()
+
+ def run(self):
+ """run method"""
+ pyfile = None
+
+ while True:
+ del pyfile
+ self.active = self.queue.get()
+ pyfile = self.active
+
+ if self.active == "quit":
+ self.active = False
+ self.m.threads.remove(self)
+ return True
+
+ try:
+ if not pyfile.hasPlugin(): continue
+ #this pyfile was deleted while queueing
+
+ pyfile.plugin.checkForSameFiles(starting=True)
+ self.log.info(_("Download starts: %s" % pyfile.name))
+
+ # start download
+ self.core.hookManager.downloadPreparing(pyfile)
+ pyfile.plugin.preprocessing(self)
+
+ self.log.info(_("Download finished: %s") % pyfile.name)
+ self.core.hookManager.downloadFinished(pyfile)
+ self.core.files.checkPackageFinished(pyfile)
+
+ except NotImplementedError:
+ self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname)
+ pyfile.setStatus("failed")
+ pyfile.error = "Plugin does not work"
+ self.clean(pyfile)
+ continue
+
+ except Abort:
+ try:
+ self.log.info(_("Download aborted: %s") % pyfile.name)
+ except:
+ pass
+
+ pyfile.setStatus("aborted")
+
+ self.clean(pyfile)
+ continue
+
+ except Reconnect:
+ self.queue.put(pyfile)
+ #pyfile.req.clearCookies()
+
+ while self.m.reconnecting.isSet():
+ sleep(0.5)
+
+ continue
+
+ except Retry, e:
+ reason = e.args[0]
+ self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason})
+ self.queue.put(pyfile)
+ continue
+
+ except Fail, e:
+ msg = e.args[0]
+
+ if msg == "offline":
+ pyfile.setStatus("offline")
+ self.log.warning(_("Download is offline: %s") % pyfile.name)
+ elif msg == "temp. offline":
+ pyfile.setStatus("temp. offline")
+ self.log.warning(_("Download is temporary offline: %s") % pyfile.name)
+ else:
+ pyfile.setStatus("failed")
+ self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg})
+ pyfile.error = msg
+
+ self.core.hookManager.downloadFailed(pyfile)
+ self.clean(pyfile)
+ continue
+
+ except error, e:
+ if len(e.args) == 2:
+ code, msg = e.args
+ else:
+ code = 0
+ msg = e.args
+
+ self.log.debug("pycurl exception %s: %s" % (code, msg))
+
+ if code in (7, 18, 28, 52, 56):
+ self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry."))
+ wait = time() + 60
+
+ pyfile.waitUntil = wait
+ pyfile.setStatus("waiting")
+ while time() < wait:
+ sleep(1)
+ if pyfile.abort:
+ break
+
+ if pyfile.abort:
+ self.log.info(_("Download aborted: %s") % pyfile.name)
+ pyfile.setStatus("aborted")
+
+ self.clean(pyfile)
+ else:
+ self.queue.put(pyfile)
+
+ continue
+
+ else:
+ pyfile.setStatus("failed")
+ self.log.error("pycurl error %s: %s" % (code, msg))
+ if self.core.debug:
+ print_exc()
+ self.writeDebugReport(pyfile.plugin.__name__, pyfile)
+
+ self.core.hookManager.downloadFailed(pyfile)
+
+ self.clean(pyfile)
+ continue
+
+ except SkipDownload, e:
+ pyfile.setStatus("skipped")
+
+ self.log.info(_("Download skipped: %(name)s due to %(plugin)s")
+ % {"name": pyfile.name, "plugin": e.message})
+
+ self.clean(pyfile)
+
+ self.core.files.checkPackageFinished(pyfile)
+
+ self.active = False
+ self.core.files.save()
+
+ continue
+
+
+ except Exception, e:
+ pyfile.setStatus("failed")
+ self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)})
+ pyfile.error = str(e)
+
+ if self.core.debug:
+ print_exc()
+ self.writeDebugReport(pyfile.plugin.__name__, pyfile)
+
+ self.core.hookManager.downloadFailed(pyfile)
+ self.clean(pyfile)
+ continue
+
+ finally:
+ self.core.files.save()
+ pyfile.checkIfProcessed()
+ exc_clear()
+
+
+ #pyfile.plugin.req.clean()
+
+ self.active = False
+ pyfile.finishIfDone()
+ self.core.files.save()
+
+
+ def put(self, job):
+ """assing job to thread"""
+ self.queue.put(job)
+
+
+ def stop(self):
+ """stops the thread"""
+ self.put("quit") \ No newline at end of file
diff --git a/module/threads/HookThread.py b/module/threads/HookThread.py
new file mode 100644
index 000000000..bffa72ca0
--- /dev/null
+++ b/module/threads/HookThread.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from copy import copy
+from traceback import print_exc
+
+from BaseThread import BaseThread
+
+class HookThread(BaseThread):
+ """thread for hooks"""
+
+ def __init__(self, m, function, args, kwargs):
+ """Constructor"""
+ BaseThread.__init__(self, m)
+
+ self.f = function
+ self.args = args
+ self.kwargs = kwargs
+
+ self.active = []
+
+ m.localThreads.append(self)
+
+ self.start()
+
+ def getActiveFiles(self):
+ return self.active
+
+ def addActive(self, pyfile):
+ """ Adds a pyfile to active list and thus will be displayed on overview"""
+ if pyfile not in self.active:
+ self.active.append(pyfile)
+
+ def finishFile(self, pyfile):
+ if pyfile in self.active:
+ self.active.remove(pyfile)
+
+ pyfile.finishIfDone()
+
+ def run(self):
+ try:
+ try:
+ self.kwargs["thread"] = self
+ self.f(*self.args, **self.kwargs)
+ except TypeError, e:
+ #dirty method to filter out exceptions
+ if "unexpected keyword argument 'thread'" not in e.args[0]:
+ raise
+
+ del self.kwargs["thread"]
+ self.f(*self.args, **self.kwargs)
+ except Exception, e:
+ if hasattr(self.f, "im_self"):
+ hook = self.f.im_self
+ hook.logError(_("An Error occured"), e)
+ if self.m.core.debug:
+ print_exc()
+ self.writeDebugReport(hook.__name__, plugin=hook)
+
+ finally:
+ local = copy(self.active)
+ for x in local:
+ self.finishFile(x)
+
+ self.m.localThreads.remove(self) \ No newline at end of file
diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py
new file mode 100644
index 000000000..7db85803a
--- /dev/null
+++ b/module/threads/InfoThread.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from time import time
+from traceback import print_exc
+
+from module.Api import OnlineStatus
+from module.common.packagetools import parseNames
+from module.utils import has_method, accumulate
+
+from BaseThread import BaseThread
+
+class InfoThread(BaseThread):
+ def __init__(self, manager, data, pid=-1, rid=-1):
+ """Constructor"""
+ BaseThread.__init__(self, manager)
+
+ self.data = data
+ self.pid = pid # package id
+ # [ .. (name, plugin) .. ]
+
+ self.rid = rid #result id
+
+ self.cache = [] #accumulated data
+
+ self.start()
+
+ def run(self):
+ """run method"""
+
+ plugins = accumulate(self.data)
+ crypter = {}
+
+ # filter out crypter plugins
+ for name in self.m.core.pluginManager.getPlugins("crypter"):
+ if name in plugins:
+ crypter[name] = plugins[name]
+ del plugins[name]
+
+ #directly write to database
+ if self.pid > -1:
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPluginModule(pluginname)
+ klass = self.m.core.pluginManager.getPluginClass(pluginname)
+ if has_method(klass, "getInfo"):
+ self.fetchForPlugin(pluginname, klass, urls, self.updateDB)
+ self.m.core.files.save()
+ elif has_method(plugin, "getInfo"):
+ self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead")
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
+ self.m.core.files.save()
+
+ else: #post the results
+ for name, urls in crypter:
+ #attach container content
+ try:
+ data = self.decrypt(name, urls)
+ except:
+ print_exc()
+ self.m.log.error("Could not decrypt container.")
+ data = []
+
+ accumulate(data, plugins)
+
+ self.m.infoResults[self.rid] = {}
+
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
+ klass = getattr(plugin, pluginname)
+ if has_method(klass, "getInfo"):
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
+ #force to process cache
+ if self.cache:
+ self.updateResult(pluginname, [], True)
+ elif has_method(plugin, "getInfo"):
+ self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead")
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
+ #force to process cache
+ if self.cache:
+ self.updateResult(pluginname, [], True)
+ else:
+ #generate default result
+ result = [(url, 0, 3, url) for url in urls]
+
+ self.updateResult(pluginname, result, True)
+
+ self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {}
+
+ self.m.timestamp = time() + 5 * 60
+
+
+ def updateDB(self, plugin, result):
+ self.m.core.files.updateFileInfo(result, self.pid)
+
+ def updateResult(self, plugin, result, force=False):
+ #parse package name and generate result
+ #accumulate results
+
+ self.cache.extend(result)
+
+ if len(self.cache) >= 20 or force:
+ #used for package generating
+ tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size))))
+ for name, size, status, url in self.cache]
+
+ data = parseNames(tmp)
+ result = {}
+ for k, v in data.iteritems():
+ for url, status in v:
+ status.packagename = k
+ result[url] = status
+
+ self.m.setInfoResults(self.rid, result)
+
+ self.cache = []
+
+ def updateCache(self, plugin, result):
+ self.cache.extend(result)
+
+ def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
+ try:
+ result = [] #result loaded from cache
+ process = [] #urls to process
+ for url in urls:
+ if url in self.m.infoCache:
+ result.append(self.m.infoCache[url])
+ else:
+ process.append(url)
+
+ if result:
+ self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
+ cb(pluginname, result)
+
+ if process:
+ self.m.log.debug("Run Info Fetching for %s" % pluginname)
+ for result in plugin.getInfo(process):
+ #result = [ .. (name, size, status, url) .. ]
+ if not type(result) == list: result = [result]
+
+ for res in result:
+ self.m.infoCache[res[3]] = res
+
+ cb(pluginname, result)
+
+ self.m.log.debug("Finished Info Fetching for %s" % pluginname)
+ except Exception, e:
+ self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
+ {"name": pluginname, "err": str(e)})
+ if self.m.core.debug:
+ print_exc()
+
+ # generate default results
+ if err:
+ result = [(url, 0, 3, url) for url in urls]
+ cb(pluginname, result)
+
+
+ def decrypt(self, plugin, urls):
+ self.m.log.debug("Pre decrypting %s" % plugin)
+ klass = self.m.core.pluginManager.loadClass("crypter", plugin)
+
+ # only decrypt files
+ if has_method(klass, "decryptFile"):
+ urls = p.decrypt(urls)
+ data, crypter = self.m.core.pluginManager.parseUrls(urls)
+ return data
+
+ return []
diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py
new file mode 100644
index 000000000..f8b5c0aba
--- /dev/null
+++ b/module/threads/ThreadManager.py
@@ -0,0 +1,300 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+
+ @author: RaNaN
+"""
+
+from os.path import exists, join
+import re
+from subprocess import Popen
+from threading import Event, Lock
+from time import sleep, time
+from traceback import print_exc
+from random import choice
+
+import pycurl
+
+from module.PyFile import PyFile
+from module.network.RequestFactory import getURL
+from module.utils import lock, uniqify
+from module.utils.fs import free_space
+
+from DecrypterThread import DecrypterThread
+from DownloadThread import DownloadThread
+from InfoThread import InfoThread
+
+class ThreadManager:
+ """manages the download threads, assign jobs, reconnect etc"""
+
+
+ def __init__(self, core):
+ """Constructor"""
+ self.core = core
+ self.log = core.log
+
+ self.threads = [] # thread list
+ self.localThreads = [] #hook+decrypter threads
+
+ self.pause = True
+
+ self.reconnecting = Event()
+ self.reconnecting.clear()
+ self.downloaded = 0 #number of files downloaded since last cleanup
+
+ self.lock = Lock()
+
+ # some operations require to fetch url info from hoster, so we caching them so it wont be done twice
+ # contains a timestamp and will be purged after timeout
+ self.infoCache = {}
+
+ # pool of ids for online check
+ self.resultIDs = 0
+
+ # threads which are fetching hoster results
+ self.infoResults = {}
+ # timeout for cache purge
+ self.timestamp = 0
+
+ pycurl.global_init(pycurl.GLOBAL_DEFAULT)
+
+ for i in range(self.core.config.get("download", "max_downloads")):
+ self.createThread()
+
+
+ def createThread(self):
+ """create a download thread"""
+
+ thread = DownloadThread(self)
+ self.threads.append(thread)
+
+ def createInfoThread(self, data, pid):
+ """ start a thread whichs fetches online status and other infos """
+ self.timestamp = time() + 5 * 60
+ if data: InfoThread(self, data, pid)
+
+ @lock
+ def createResultThread(self, data):
+ """ creates a thread to fetch online status, returns result id """
+ self.timestamp = time() + 5 * 60
+
+ rid = self.resultIDs
+ self.resultIDs += 1
+
+ InfoThread(self, data, rid=rid)
+
+ return rid
+
+ @lock
+ def createDecryptThread(self, data, pid):
+ """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
+ if data: DecrypterThread(self, data, pid)
+
+
+ @lock
+ def getInfoResult(self, rid):
+ """returns result and clears it"""
+ self.timestamp = time() + 5 * 60
+
+ if rid in self.infoResults:
+ data = self.infoResults[rid]
+ self.infoResults[rid] = {}
+ return data
+ else:
+ return {}
+
+ @lock
+ def setInfoResults(self, rid, result):
+ self.infoResults[rid].update(result)
+
+ def getActiveFiles(self):
+ active = [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)]
+
+ for t in self.localThreads:
+ active.extend(t.getActiveFiles())
+
+ return active
+
+ def processingIds(self):
+ """get a id list of all pyfiles processed"""
+ return [x.id for x in self.getActiveFiles()]
+
+
+ def work(self):
+ """run all task which have to be done (this is for repetivive call by core)"""
+ try:
+ self.tryReconnect()
+ except Exception, e:
+ self.log.error(_("Reconnect Failed: %s") % str(e) )
+ self.reconnecting.clear()
+ if self.core.debug:
+ print_exc()
+ self.checkThreadCount()
+
+ try:
+ self.assignJob()
+ except Exception, e:
+ self.log.warning("Assign job error", e)
+ if self.core.debug:
+ print_exc()
+
+ sleep(0.5)
+ self.assignJob()
+ #it may be failed non critical so we try it again
+
+ if (self.infoCache or self.infoResults) and self.timestamp < time():
+ self.infoCache.clear()
+ self.infoResults.clear()
+ self.log.debug("Cleared Result cache")
+
+ def tryReconnect(self):
+ """checks if reconnect needed"""
+
+ if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()):
+ return False
+
+ active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active]
+
+ if not (0 < active.count(True) == len(active)):
+ return False
+
+ if not exists(self.core.config['reconnect']['method']):
+ if exists(join(pypath, self.core.config['reconnect']['method'])):
+ self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
+ else:
+ self.core.config["reconnect"]["activated"] = False
+ self.log.warning(_("Reconnect script not found!"))
+ return
+
+ self.reconnecting.set()
+
+ #Do reconnect
+ self.log.info(_("Starting reconnect"))
+
+ while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0:
+ sleep(0.25)
+
+ ip = self.getIP()
+
+ self.core.hookManager.beforeReconnecting(ip)
+
+ self.log.debug("Old IP: %s" % ip)
+
+ try:
+ reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE)
+ except:
+ self.log.warning(_("Failed executing reconnect script!"))
+ self.core.config["reconnect"]["activated"] = False
+ self.reconnecting.clear()
+ if self.core.debug:
+ print_exc()
+ return
+
+ reconn.wait()
+ sleep(1)
+ ip = self.getIP()
+ self.core.hookManager.afterReconnecting(ip)
+
+ self.log.info(_("Reconnected, new IP: %s") % ip)
+
+ self.reconnecting.clear()
+
+ def getIP(self):
+ """retrieve current ip"""
+ services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"),
+ ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")]
+
+ ip = ""
+ for i in range(10):
+ try:
+ sv = choice(services)
+ ip = getURL(sv[0])
+ ip = re.match(sv[1], ip).group(1)
+ break
+ except:
+ ip = ""
+ sleep(1)
+
+ return ip
+
+ def checkThreadCount(self):
+ """checks if there are need for increasing or reducing thread count"""
+
+ if len(self.threads) == self.core.config.get("download", "max_downloads"):
+ return True
+ elif len(self.threads) < self.core.config.get("download", "max_downloads"):
+ self.createThread()
+ else:
+ free = [x for x in self.threads if not x.active]
+ if free:
+ free[0].put("quit")
+
+
+ def cleanPycurl(self):
+ """ make a global curl cleanup (currently ununused) """
+ if self.processingIds():
+ return False
+ pycurl.global_cleanup()
+ pycurl.global_init(pycurl.GLOBAL_DEFAULT)
+ self.downloaded = 0
+ self.log.debug("Cleaned up pycurl")
+ return True
+
+
+ def assignJob(self):
+ """assing a job to a thread if possible"""
+
+ if self.pause or not self.core.api.isTimeDownload(): return
+
+ #if self.downloaded > 20:
+ # if not self.cleanPyCurl(): return
+
+ free = [x for x in self.threads if not x.active]
+
+ inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()]
+ inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse]
+ occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]])))
+
+ job = self.core.files.getJob(occ)
+ if job:
+ try:
+ job.initPlugin()
+ except Exception, e:
+ self.log.critical(str(e))
+ print_exc()
+ job.setStatus("failed")
+ job.error = str(e)
+ job.release()
+ return
+
+ spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024
+ if spaceLeft < self.core.config["general"]["min_free_space"]:
+ self.log.warning(_("Not enough space left on device"))
+ self.pause = True
+
+ if free and not self.pause:
+ thread = free[0]
+ #self.downloaded += 1
+ thread.put(job)
+ else:
+ #put job back
+ if occ not in self.core.files.jobCache:
+ self.core.files.jobCache[occ] = []
+ self.core.files.jobCache[occ].append(job.id)
+
+ def cleanup(self):
+ """do global cleanup, should be called when finished with pycurl"""
+ pycurl.global_cleanup()
diff --git a/module/threads/__init__.py b/module/threads/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/module/threads/__init__.py