summaryrefslogtreecommitdiffstats
path: root/module/threads
diff options
context:
space:
mode:
Diffstat (limited to 'module/threads')
-rw-r--r--module/threads/AddonThread.py65
-rw-r--r--module/threads/BaseThread.py142
-rw-r--r--module/threads/DecrypterThread.py81
-rw-r--r--module/threads/DownloadThread.py231
-rw-r--r--module/threads/InfoThread.py168
-rw-r--r--module/threads/ThreadManager.py313
-rw-r--r--module/threads/__init__.py0
7 files changed, 0 insertions, 1000 deletions
diff --git a/module/threads/AddonThread.py b/module/threads/AddonThread.py
deleted file mode 100644
index afb56f66b..000000000
--- a/module/threads/AddonThread.py
+++ /dev/null
@@ -1,65 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from copy import copy
-from traceback import print_exc
-
-from BaseThread import BaseThread
-
-class AddonThread(BaseThread):
- """thread for addons"""
-
- def __init__(self, m, function, args, kwargs):
- """Constructor"""
- BaseThread.__init__(self, m)
-
- self.f = function
- self.args = args
- self.kwargs = kwargs
-
- self.active = []
-
- m.localThreads.append(self)
-
- self.start()
-
- def getActiveFiles(self):
- return self.active
-
- def addActive(self, pyfile):
- """ Adds a pyfile to active list and thus will be displayed on overview"""
- if pyfile not in self.active:
- self.active.append(pyfile)
-
- def finishFile(self, pyfile):
- if pyfile in self.active:
- self.active.remove(pyfile)
-
- pyfile.finishIfDone()
-
- def run(self): #TODO: approach via func_code
- try:
- try:
- self.kwargs["thread"] = self
- self.f(*self.args, **self.kwargs)
- except TypeError, e:
- #dirty method to filter out exceptions
- if "unexpected keyword argument 'thread'" not in e.args[0]:
- raise
-
- del self.kwargs["thread"]
- self.f(*self.args, **self.kwargs)
- except Exception, e:
- if hasattr(self.f, "im_self"):
- addon = self.f.im_self
- addon.logError(_("An Error occurred"), e)
- if self.m.core.debug:
- print_exc()
- self.writeDebugReport(addon.__name__, plugin=addon)
-
- finally:
- local = copy(self.active)
- for x in local:
- self.finishFile(x)
-
- self.m.localThreads.remove(self) \ No newline at end of file
diff --git a/module/threads/BaseThread.py b/module/threads/BaseThread.py
deleted file mode 100644
index c64678a72..000000000
--- a/module/threads/BaseThread.py
+++ /dev/null
@@ -1,142 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from threading import Thread
-from time import strftime, gmtime
-from sys import exc_info
-from types import MethodType
-from pprint import pformat
-from traceback import format_exc
-
-from module.utils import primary_uid
-from module.utils.fs import listdir, join, save_join, stat, exists
-
-class BaseThread(Thread):
- """abstract base class for thread types"""
-
- def __init__(self, manager):
- Thread.__init__(self)
- self.setDaemon(True)
- self.m = manager #thread manager
- self.core = manager.core
- self.log = manager.core.log
-
- #: Owner of the thread, every type should set it
- self.owner = None
-
- @property
- def user(self):
- return primary_uid(self.owner)
-
- def getProgress(self):
- """ retrieves progress information about the current running task
-
- :return: :class:`ProgressInfo`
- """
-
- # Debug Stuff
- def writeDebugReport(self, name, pyfile=None, plugin=None):
- """ writes a debug report to disk """
-
- dump_name = "debug_%s_%s.zip" % (name, strftime("%d-%m-%Y_%H-%M-%S"))
- if pyfile:
- dump = self.getFileDump(pyfile)
- else:
- dump = self.getPluginDump(plugin)
-
- try:
- import zipfile
-
- zip = zipfile.ZipFile(dump_name, "w")
-
- if exists(join("tmp", name)):
- for f in listdir(join("tmp", name)):
- try:
- # avoid encoding errors
- zip.write(join("tmp", name, f), save_join(name, f))
- except:
- pass
-
- info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime())
- info.external_attr = 0644 << 16L # change permissions
- zip.writestr(info, dump)
-
- info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime())
- info.external_attr = 0644 << 16L
- zip.writestr(info, self.getSystemDump())
-
- zip.close()
-
- if not stat(dump_name).st_size:
- raise Exception("Empty Zipfile")
-
- except Exception, e:
- self.log.debug("Error creating zip file: %s" % e)
-
- dump_name = dump_name.replace(".zip", ".txt")
- f = open(dump_name, "wb")
- f.write(dump)
- f.close()
-
- self.log.info("Debug Report written to %s" % dump_name)
- return dump_name
-
- def getFileDump(self, pyfile):
- dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
- self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc())
-
- tb = exc_info()[2]
- stack = []
- while tb:
- stack.append(tb.tb_frame)
- tb = tb.tb_next
-
- for frame in stack[1:]:
- dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name,
- frame.f_code.co_filename,
- frame.f_lineno)
-
- for key, value in frame.f_locals.items():
- dump += "\t%20s = " % key
- try:
- dump += pformat(value) + "\n"
- except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
-
- del frame
-
- del stack #delete it just to be sure...
-
- dump += "\n\nPLUGIN OBJECT DUMP: \n\n"
-
- for name in dir(pyfile.plugin):
- attr = getattr(pyfile.plugin, name)
- if not name.endswith("__") and type(attr) != MethodType:
- dump += "\t%20s = " % name
- try:
- dump += pformat(attr) + "\n"
- except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
-
- dump += "\nPYFILE OBJECT DUMP: \n\n"
-
- for name in dir(pyfile):
- attr = getattr(pyfile, name)
- if not name.endswith("__") and type(attr) != MethodType:
- dump += "\t%20s = " % name
- try:
- dump += pformat(attr) + "\n"
- except Exception, e:
- dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
-
- dump += "\n\nCONFIG: \n\n"
- dump += pformat(self.m.core.config.values) + "\n"
-
- return dump
-
- #TODO
- def getPluginDump(self, plugin):
- return ""
-
- def getSystemDump(self):
- return ""
diff --git a/module/threads/DecrypterThread.py b/module/threads/DecrypterThread.py
deleted file mode 100644
index 39448a620..000000000
--- a/module/threads/DecrypterThread.py
+++ /dev/null
@@ -1,81 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from time import sleep
-from traceback import print_exc
-
-from module.utils import uniqify
-from module.plugins.Base import Retry
-from module.plugins.Crypter import Package
-
-from BaseThread import BaseThread
-
-class DecrypterThread(BaseThread):
- """thread for decrypting"""
-
- def __init__(self, manager, data, pid):
- """constructor"""
- BaseThread.__init__(self, manager)
- self.data = data
- self.pid = pid
-
- self.start()
-
- def run(self):
- plugin_map = {}
- for url, plugin in self.data:
- if plugin in plugin_map:
- plugin_map[plugin].append(url)
- else:
- plugin_map[plugin] = [url]
-
- self.decrypt(plugin_map)
-
- def decrypt(self, plugin_map):
- pack = self.m.core.files.getPackage(self.pid)
- result = []
-
- for name, urls in plugin_map.iteritems():
- klass = self.m.core.pluginManager.loadClass("crypter", name)
- plugin = klass(self.m.core, pack, pack.password)
- plugin_result = []
-
- try:
- try:
- plugin_result = plugin._decrypt(urls)
- except Retry:
- sleep(1)
- plugin_result = plugin._decrypt(urls)
- except Exception, e:
- plugin.logError(_("Decrypting failed"), e)
- if self.m.core.debug:
- print_exc()
- self.writeDebugReport(plugin.__name__, plugin=plugin)
-
- plugin.logDebug("Decrypted", plugin_result)
- result.extend(plugin_result)
-
- #TODO
- result = uniqify(result)
- pack_names = {}
- urls = []
-
- for p in result:
- if isinstance(p, Package):
- if p.name in pack_names:
- pack_names[p.name].urls.extend(p.urls)
- else:
- pack_names[p.name] = p
- else:
- urls.append(p)
-
- if urls:
- self.log.info(_("Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name": pack.name})
- self.m.core.api.addFiles(self.pid, urls)
-
- for p in pack_names.itervalues():
- self.m.core.api.addPackage(p.name, p.urls, pack.password)
-
- if not result:
- self.log.info(_("No links decrypted"))
-
diff --git a/module/threads/DownloadThread.py b/module/threads/DownloadThread.py
deleted file mode 100644
index cf59c5639..000000000
--- a/module/threads/DownloadThread.py
+++ /dev/null
@@ -1,231 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, see <http://www.gnu.org/licenses/>.
-
- @author: RaNaN
-"""
-
-from Queue import Queue
-from time import sleep, time
-from traceback import print_exc
-from sys import exc_clear
-from pycurl import error
-
-from module.plugins.Base import Fail, Retry, Abort
-from module.plugins.Hoster import Reconnect, SkipDownload
-from module.network.HTTPRequest import BadHeader
-
-from BaseThread import BaseThread
-
-class DownloadThread(BaseThread):
- """thread for downloading files from 'real' hoster plugins"""
-
- def __init__(self, manager):
- """Constructor"""
- BaseThread.__init__(self, manager)
-
- self.queue = Queue() # job queue
- self.active = None
-
- self.start()
-
- def run(self):
- """run method"""
- pyfile = None
-
- while True:
- del pyfile
- self.active = self.queue.get()
- pyfile = self.active
-
- if self.active == "quit":
- self.active = None
- self.m.threads.remove(self)
- return True
-
- try:
- if not pyfile.hasPlugin(): continue
- #this pyfile was deleted while queuing
-
- pyfile.plugin.checkForSameFiles(starting=True)
- self.log.info(_("Download starts: %s" % pyfile.name))
-
- # start download
- self.core.addonManager.downloadPreparing(pyfile)
- pyfile.plugin.preprocessing(self)
-
- self.log.info(_("Download finished: %s") % pyfile.name)
- self.core.addonManager.downloadFinished(pyfile)
- self.core.files.checkPackageFinished(pyfile)
-
- except NotImplementedError:
- self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname)
- pyfile.setStatus("failed")
- pyfile.error = "Plugin does not work"
- self.clean(pyfile)
- continue
-
- except Abort:
- try:
- self.log.info(_("Download aborted: %s") % pyfile.name)
- except:
- pass
-
- pyfile.setStatus("aborted")
-
- self.clean(pyfile)
- continue
-
- except Reconnect:
- self.queue.put(pyfile)
- #pyfile.req.clearCookies()
-
- while self.m.reconnecting.isSet():
- sleep(0.5)
-
- continue
-
- except Retry, e:
- reason = e.args[0]
- self.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason})
- self.queue.put(pyfile)
- continue
- except Fail, e:
- msg = e.args[0]
-
- # TODO: activate former skipped downloads
-
- if msg == "offline":
- pyfile.setStatus("offline")
- self.log.warning(_("Download is offline: %s") % pyfile.name)
- elif msg == "temp. offline":
- pyfile.setStatus("temp. offline")
- self.log.warning(_("Download is temporary offline: %s") % pyfile.name)
- else:
- pyfile.setStatus("failed")
- self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg})
- pyfile.error = msg
-
- self.core.addonManager.downloadFailed(pyfile)
- self.clean(pyfile)
- continue
-
- except error, e:
- if len(e.args) == 2:
- code, msg = e.args
- else:
- code = 0
- msg = e.args
-
- self.log.debug("pycurl exception %s: %s" % (code, msg))
-
- if code in (7, 18, 28, 52, 56):
- self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry."))
- wait = time() + 60
-
- pyfile.waitUntil = wait
- pyfile.setStatus("waiting")
- while time() < wait:
- sleep(1)
- if pyfile.abort:
- break
-
- if pyfile.abort:
- self.log.info(_("Download aborted: %s") % pyfile.name)
- pyfile.setStatus("aborted")
-
- self.clean(pyfile)
- else:
- self.queue.put(pyfile)
-
- continue
-
- else:
- pyfile.setStatus("failed")
- self.log.error("pycurl error %s: %s" % (code, msg))
- if self.core.debug:
- print_exc()
- self.writeDebugReport(pyfile.plugin.__name__, pyfile)
-
- self.core.addonManager.downloadFailed(pyfile)
-
- self.clean(pyfile)
- continue
-
- except SkipDownload, e:
- pyfile.setStatus("skipped")
-
- self.log.info(_("Download skipped: %(name)s due to %(plugin)s")
- % {"name": pyfile.name, "plugin": e.message})
-
- self.clean(pyfile)
-
- self.core.files.checkPackageFinished(pyfile)
-
- self.active = False
- self.core.files.save()
-
- continue
-
-
- except Exception, e:
- if isinstance(e, BadHeader) and e.code == 500:
- pyfile.setStatus("temp. offline")
- self.log.warning(_("Download is temporary offline: %s") % pyfile.name)
- pyfile.error = _("Internal Server Error")
-
- else:
- pyfile.setStatus("failed")
- self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)})
- pyfile.error = str(e)
-
- if self.core.debug:
- print_exc()
- self.writeDebugReport(pyfile.plugin.__name__, pyfile)
-
- self.core.addonManager.downloadFailed(pyfile)
- self.clean(pyfile)
- continue
-
- finally:
- self.core.files.save()
- pyfile.checkIfProcessed()
- exc_clear()
-
-
- #pyfile.plugin.req.clean()
-
- self.active = False
- pyfile.finishIfDone()
- self.core.files.save()
-
- def getProgress(self):
- if self.active:
- return self.active.getProgressInfo()
-
-
- def put(self, job):
- """assign a job to the thread"""
- self.queue.put(job)
-
- def clean(self, pyfile):
- """ set thread inactive and release pyfile """
- self.active = False
- pyfile.release()
-
- def stop(self):
- """stops the thread"""
- self.put("quit")
diff --git a/module/threads/InfoThread.py b/module/threads/InfoThread.py
deleted file mode 100644
index bf5bb5777..000000000
--- a/module/threads/InfoThread.py
+++ /dev/null
@@ -1,168 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-from time import time
-from traceback import print_exc
-
-from module.Api import LinkStatus
-from module.utils.packagetools import parseNames
-from module.utils import has_method, accumulate
-
-from BaseThread import BaseThread
-
-class InfoThread(BaseThread):
- def __init__(self, manager, data, pid=-1, rid=-1):
- """Constructor"""
- BaseThread.__init__(self, manager)
-
- self.data = data
- self.pid = pid # package id
- # [ .. (name, plugin) .. ]
-
- self.rid = rid #result id
-
- self.cache = [] #accumulated data
-
- self.start()
-
- def run(self):
- """run method"""
-
- plugins = accumulate(self.data)
- crypter = {}
-
- # filter out crypter plugins
- for name in self.m.core.pluginManager.getPlugins("crypter"):
- if name in plugins:
- crypter[name] = plugins[name]
- del plugins[name]
-
- #directly write to database
- if self.pid > -1:
- for pluginname, urls in plugins.iteritems():
- plugin = self.m.core.pluginManager.getPluginModule(pluginname)
- klass = self.m.core.pluginManager.getPluginClass(pluginname)
- if has_method(klass, "getInfo"):
- self.fetchForPlugin(pluginname, klass, urls, self.updateDB)
- self.m.core.files.save()
- elif has_method(plugin, "getInfo"):
- self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead")
- self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
- self.m.core.files.save()
-
- else: #post the results
- for name, urls in crypter:
- #attach container content
- try:
- data = self.decrypt(name, urls)
- except:
- print_exc()
- self.m.log.error("Could not decrypt container.")
- data = []
-
- accumulate(data, plugins)
-
- self.m.infoResults[self.rid] = {}
-
- for pluginname, urls in plugins.iteritems():
- plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
- klass = getattr(plugin, pluginname)
- if has_method(klass, "getInfo"):
- self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
- #force to process cache
- if self.cache:
- self.updateResult(pluginname, [], True)
- elif has_method(plugin, "getInfo"):
- self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead")
- self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
- #force to process cache
- if self.cache:
- self.updateResult(pluginname, [], True)
- else:
- #generate default result
- result = [(url, 0, 3, url) for url in urls]
-
- self.updateResult(pluginname, result, True)
-
- self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {}
-
- self.m.timestamp = time() + 5 * 60
-
-
- def updateDB(self, plugin, result):
- self.m.core.files.updateFileInfo(result, self.pid)
-
- def updateResult(self, plugin, result, force=False):
- #parse package name and generate result
- #accumulate results
-
- self.cache.extend(result)
-
- if len(self.cache) >= 20 or force:
- #used for package generating
- tmp = [(name, (url, LinkStatus(name, plugin, "unknown", status, int(size))))
- for name, size, status, url in self.cache]
-
- data = parseNames(tmp)
- result = {}
- for k, v in data.iteritems():
- for url, status in v:
- status.packagename = k
- result[url] = status
-
- self.m.setInfoResults(self.rid, result)
-
- self.cache = []
-
- def updateCache(self, plugin, result):
- self.cache.extend(result)
-
- def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
- try:
- result = [] #result loaded from cache
- process = [] #urls to process
- for url in urls:
- if url in self.m.infoCache:
- result.append(self.m.infoCache[url])
- else:
- process.append(url)
-
- if result:
- self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
- cb(pluginname, result)
-
- if process:
- self.m.log.debug("Run Info Fetching for %s" % pluginname)
- for result in plugin.getInfo(process):
- #result = [ .. (name, size, status, url) .. ]
- if not type(result) == list: result = [result]
-
- for res in result:
- self.m.infoCache[res[3]] = res
-
- cb(pluginname, result)
-
- self.m.log.debug("Finished Info Fetching for %s" % pluginname)
- except Exception, e:
- self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
- {"name": pluginname, "err": str(e)})
- if self.m.core.debug:
- print_exc()
-
- # generate default results
- if err:
- result = [(url, 0, 3, url) for url in urls]
- cb(pluginname, result)
-
-
- def decrypt(self, plugin, urls):
- self.m.log.debug("Pre decrypting %s" % plugin)
- klass = self.m.core.pluginManager.loadClass("crypter", plugin)
-
- # only decrypt files
- if has_method(klass, "decryptFile"):
- urls = klass.decrypt(urls)
- data, crypter = self.m.core.pluginManager.parseUrls(urls)
- return data
-
- return []
diff --git a/module/threads/ThreadManager.py b/module/threads/ThreadManager.py
deleted file mode 100644
index f67179d08..000000000
--- a/module/threads/ThreadManager.py
+++ /dev/null
@@ -1,313 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-###############################################################################
-# Copyright(c) 2008-2012 pyLoad Team
-# http://www.pyload.org
-#
-# This file is part of pyLoad.
-# pyLoad is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# Subjected to the terms and conditions in LICENSE
-#
-# @author: RaNaN
-###############################################################################
-
-from os.path import exists, join
-import re
-from subprocess import Popen
-from threading import Event, Lock
-from time import sleep, time
-from traceback import print_exc
-from random import choice
-
-import pycurl
-
-from module.datatypes.PyFile import PyFile
-from module.network.RequestFactory import getURL
-from module.utils import lock, uniqify
-from module.utils.fs import free_space
-
-from DecrypterThread import DecrypterThread
-from DownloadThread import DownloadThread
-from InfoThread import InfoThread
-
-class ThreadManager:
- """manages the download threads, assign jobs, reconnect etc"""
-
-
- def __init__(self, core):
- """Constructor"""
- self.core = core
- self.log = core.log
-
- self.threads = [] # thread list
- self.localThreads = [] #addon+decrypter threads
-
- self.pause = True
-
- self.reconnecting = Event()
- self.reconnecting.clear()
- self.downloaded = 0 #number of files downloaded since last cleanup
-
- self.lock = Lock()
-
- # some operations require to fetch url info from hoster, so we caching them so it wont be done twice
- # contains a timestamp and will be purged after timeout
- self.infoCache = {}
-
- # pool of ids for online check
- self.resultIDs = 0
-
- # threads which are fetching hoster results
- self.infoResults = {}
- # timeout for cache purge
- self.timestamp = 0
-
- pycurl.global_init(pycurl.GLOBAL_DEFAULT)
-
- for i in range(self.core.config.get("download", "max_downloads")):
- self.createThread()
-
-
- def createThread(self):
- """create a download thread"""
-
- thread = DownloadThread(self)
- self.threads.append(thread)
-
- def createInfoThread(self, data, pid):
- """ start a thread which fetches online status and other info's """
- self.timestamp = time() + 5 * 60
- if data: InfoThread(self, data, pid)
-
- @lock
- def createResultThread(self, data):
- """ creates a thread to fetch online status, returns result id """
- self.timestamp = time() + 5 * 60
-
- rid = self.resultIDs
- self.resultIDs += 1
-
- InfoThread(self, data, rid=rid)
-
- return rid
-
- @lock
- def createDecryptThread(self, data, pid):
- """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
- if data: DecrypterThread(self, data, pid)
-
-
- @lock
- def getInfoResult(self, rid):
- """returns result and clears it"""
- self.timestamp = time() + 5 * 60
-
- if rid in self.infoResults:
- data = self.infoResults[rid]
- self.infoResults[rid] = {}
- return data
- else:
- return {}
-
- @lock
- def setInfoResults(self, rid, result):
- self.infoResults[rid].update(result)
-
- def getActiveDownloads(self, user=None):
- # TODO: user context
- return [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)]
-
- def getProgressList(self, user=None):
- info = []
-
- # TODO: local threads can create multiple progresses
- for thread in self.threads + self.localThreads:
- # skip if not belong to current user
- if user and thread.user != user: continue
-
- progress = thread.getProgress()
- if progress: info.append(progress)
-
- return info
-
- def getActiveFiles(self):
- active = self.getActiveDownloads()
-
- for t in self.localThreads:
- active.extend(t.getActiveFiles())
-
- return active
-
- def processingIds(self):
- """get a id list of all pyfiles processed"""
- return [x.id for x in self.getActiveFiles()]
-
- def work(self):
- """run all task which have to be done (this is for repetetive call by core)"""
- try:
- self.tryReconnect()
- except Exception, e:
- self.log.error(_("Reconnect Failed: %s") % str(e) )
- self.reconnecting.clear()
- self.core.print_exc()
-
- self.checkThreadCount()
-
- try:
- self.assignJob()
- except Exception, e:
- self.log.warning("Assign job error", e)
- self.core.print_exc()
-
- sleep(0.5)
- self.assignJob()
- #it may be failed non critical so we try it again
-
- if (self.infoCache or self.infoResults) and self.timestamp < time():
- self.infoCache.clear()
- self.infoResults.clear()
- self.log.debug("Cleared Result cache")
-
- def tryReconnect(self):
- """checks if reconnect needed"""
-
- if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()):
- return False
-
- active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active]
-
- if not (0 < active.count(True) == len(active)):
- return False
-
- if not exists(self.core.config['reconnect']['method']):
- if exists(join(pypath, self.core.config['reconnect']['method'])):
- self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
- else:
- self.core.config["reconnect"]["activated"] = False
- self.log.warning(_("Reconnect script not found!"))
- return
-
- self.reconnecting.set()
-
- #Do reconnect
- self.log.info(_("Starting reconnect"))
-
- while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0:
- sleep(0.25)
-
- ip = self.getIP()
-
- self.core.addonManager.beforeReconnecting(ip)
-
- self.log.debug("Old IP: %s" % ip)
-
- try:
- reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE)
- except:
- self.log.warning(_("Failed executing reconnect script!"))
- self.core.config["reconnect"]["activated"] = False
- self.reconnecting.clear()
- if self.core.debug:
- print_exc()
- return
-
- reconn.wait()
- sleep(1)
- ip = self.getIP()
- self.core.addonManager.afterReconnecting(ip)
-
- self.log.info(_("Reconnected, new IP: %s") % ip)
-
- self.reconnecting.clear()
-
- def getIP(self):
- """retrieve current ip"""
- services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"),
- ("http://checkip.dyndns.org/",".*Current IP Address: (\S+)</body>.*")]
-
- ip = ""
- for i in range(10):
- try:
- sv = choice(services)
- ip = getURL(sv[0])
- ip = re.match(sv[1], ip).group(1)
- break
- except:
- ip = ""
- sleep(1)
-
- return ip
-
- def checkThreadCount(self):
- """checks if there is a need for increasing or reducing thread count"""
-
- if len(self.threads) == self.core.config.get("download", "max_downloads"):
- return True
- elif len(self.threads) < self.core.config.get("download", "max_downloads"):
- self.createThread()
- else:
- free = [x for x in self.threads if not x.active]
- if free:
- free[0].put("quit")
-
-
- def cleanPycurl(self):
- """ make a global curl cleanup (currently unused) """
- if self.processingIds():
- return False
- pycurl.global_cleanup()
- pycurl.global_init(pycurl.GLOBAL_DEFAULT)
- self.downloaded = 0
- self.log.debug("Cleaned up pycurl")
- return True
-
-
- def assignJob(self):
- """assign a job to a thread if possible"""
-
- if self.pause or not self.core.api.isTimeDownload(): return
-
- #if self.downloaded > 20:
- # if not self.cleanPyCurl(): return
-
- free = [x for x in self.threads if not x.active]
-
- inuse = [(x.active.pluginname, x.active.plugin.getDownloadLimit()) for x in self.threads if x.active and x.active.hasPlugin()]
- inuse = [(x[0], x[1], len([y for y in self.threads if y.active and y.active.pluginname == x[0]])) for x in inuse]
- occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]])))
-
- job = self.core.files.getJob(occ)
- if job:
- try:
- job.initPlugin()
- except Exception, e:
- self.log.critical(str(e))
- print_exc()
- job.setStatus("failed")
- job.error = str(e)
- job.release()
- return
-
- spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024
- if spaceLeft < self.core.config["general"]["min_free_space"]:
- self.log.warning(_("Not enough space left on device"))
- self.pause = True
-
- if free and not self.pause:
- thread = free[0]
- #self.downloaded += 1
- thread.put(job)
- else:
- #put job back
- if occ not in self.core.files.jobCache:
- self.core.files.jobCache[occ] = []
- self.core.files.jobCache[occ].append(job.id)
-
- def cleanup(self):
- """do global cleanup, should be called when finished with pycurl"""
- pycurl.global_cleanup()
diff --git a/module/threads/__init__.py b/module/threads/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/module/threads/__init__.py
+++ /dev/null