path: root/module/threads
diff options
Diffstat (limited to 'module/threads')
7 files changed, 971 insertions, 0 deletions
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..b6a552e0e
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from copy import copy
+from traceback import print_exc
+from BaseThread import BaseThread
+class AddonThread(BaseThread):
+ """thread for addons"""
+ def __init__(self, m, function, args, kwargs):
+ """Constructor"""
+ BaseThread.__init__(self, m)
+ self.f = function
+ self.args = args
+ self.kwargs = kwargs
+ = []
+ m.localThreads.append(self)
+ self.start()
+ def getActiveFiles(self):
+ return
+ def addActive(self, pyfile):
+ """ Adds a pyfile to active list and thus will be displayed on overview"""
+ if pyfile not in
+ def finishFile(self, pyfile):
+ if pyfile in
+ pyfile.finishIfDone()
+ def run(self): #TODO: approach via func_code
+ try:
+ try:
+ self.kwargs["thread"] = self
+ self.f(*self.args, **self.kwargs)
+ except TypeError, e:
+ #dirty method to filter out exceptions
+ if "unexpected keyword argument 'thread'" not in e.args[0]:
+ raise
+ del self.kwargs["thread"]
+ self.f(*self.args, **self.kwargs)
+ except Exception, e:
+ if hasattr(self.f, "im_self"):
+ addon = self.f.im_self
+ addon.logError(_("An Error occured"), e)
+ if self.m.core.debug:
+ print_exc()
+ self.writeDebugReport(addon.__name__, plugin=addon)
+ finally:
+ local = copy(
+ for x in local:
+ self.finishFile(x)
+ self.m.localThreads.remove(self) \ No newline at end of file
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..7a0ee5ee4
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import os
+import sys
+import locale
+from threading import Thread
+from time import strftime, gmtime
+from sys import exc_info
+from types import MethodType
+from pprint import pformat
+from traceback import format_exc
+from module.utils.fs import listdir, join, save_join, stat, exists
+class BaseThread(Thread):
+ """abstract base class for thread types"""
+ def __init__(self, manager):
+ Thread.__init__(self)
+ self.setDaemon(True)
+ self.m = manager #thread manager
+ self.core = manager.core
+ self.log = manager.core.log
+ def writeDebugReport(self, name, pyfile=None, plugin=None):
+ """ writes a debug report to disk """
+ dump_name = "" % (name, strftime("%d-%m-%Y_%H-%M-%S"))
+ if pyfile:
+ dump = self.getFileDump(pyfile)
+ else:
+ dump = self.getPluginDump(plugin)
+ try:
+ import zipfile
+ zip = zipfile.ZipFile(dump_name, "w")
+ if exists(join("tmp", name)):
+ for f in listdir(join("tmp", name)):
+ try:
+ # avoid encoding errors
+ zip.write(join("tmp", name, f), save_join(name, f))
+ except:
+ pass
+ info = zipfile.ZipInfo(save_join(name, "debug_Report.txt"), gmtime())
+ info.external_attr = 0644 << 16L # change permissions
+ zip.writestr(info, dump)
+ info = zipfile.ZipInfo(save_join(name, "system_Report.txt"), gmtime())
+ info.external_attr = 0644 << 16L
+ zip.writestr(info, self.getSystemDump())
+ zip.close()
+ if not stat(dump_name).st_size:
+ raise Exception("Empty Zipfile")
+ except Exception, e:
+ self.log.debug("Error creating zip file: %s" % e)
+ dump_name = dump_name.replace(".zip", ".txt")
+ f = open(dump_name, "wb")
+ f.write(dump)
+ f.close()
+"Debug Report written to %s" % dump_name)
+ return dump_name
+ def getFileDump(self, pyfile):
+ dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
+ self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc())
+ tb = exc_info()[2]
+ stack = []
+ while tb:
+ stack.append(tb.tb_frame)
+ tb = tb.tb_next
+ for frame in stack[1:]:
+ dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name,
+ frame.f_code.co_filename,
+ frame.f_lineno)
+ for key, value in frame.f_locals.items():
+ dump += "\t%20s = " % key
+ try:
+ dump += pformat(value) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+ del frame
+ del stack #delete it just to be sure...
+ dump += "\n\nPLUGIN OBJECT DUMP: \n\n"
+ for name in dir(pyfile.plugin):
+ attr = getattr(pyfile.plugin, name)
+ if not name.endswith("__") and type(attr) != MethodType:
+ dump += "\t%20s = " % name
+ try:
+ dump += pformat(attr) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+ dump += "\nPYFILE OBJECT DUMP: \n\n"
+ for name in dir(pyfile):
+ attr = getattr(pyfile, name)
+ if not name.endswith("__") and type(attr) != MethodType:
+ dump += "\t%20s = " % name
+ try:
+ dump += pformat(attr) + "\n"
+ except Exception, e:
+ dump += "<ERROR WHILE PRINTING VALUE> " + str(e) + "\n"
+ dump += "\n\nCONFIG: \n\n"
+ dump += pformat(self.m.core.config.values) + "\n"
+ return dump
+ def getPluginDump(self, plugin):
+ return ""
+ def getSystemDump(self):
+ return ""
+ def clean(self, pyfile):
+ """ set thread inactive and release pyfile """
+ = False
+ pyfile.release()
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..39448a620
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from time import sleep
+from traceback import print_exc
+from module.utils import uniqify
+from module.plugins.Base import Retry
+from module.plugins.Crypter import Package
+from BaseThread import BaseThread
+class DecrypterThread(BaseThread):
+ """thread for decrypting"""
+ def __init__(self, manager, data, pid):
+ """constructor"""
+ BaseThread.__init__(self, manager)
+ = data
+ = pid
+ self.start()
+ def run(self):
+ plugin_map = {}
+ for url, plugin in
+ if plugin in plugin_map:
+ plugin_map[plugin].append(url)
+ else:
+ plugin_map[plugin] = [url]
+ self.decrypt(plugin_map)
+ def decrypt(self, plugin_map):
+ pack = self.m.core.files.getPackage(
+ result = []
+ for name, urls in plugin_map.iteritems():
+ klass = self.m.core.pluginManager.loadClass("crypter", name)
+ plugin = klass(self.m.core, pack, pack.password)
+ plugin_result = []
+ try:
+ try:
+ plugin_result = plugin._decrypt(urls)
+ except Retry:
+ sleep(1)
+ plugin_result = plugin._decrypt(urls)
+ except Exception, e:
+ plugin.logError(_("Decrypting failed"), e)
+ if self.m.core.debug:
+ print_exc()
+ self.writeDebugReport(plugin.__name__, plugin=plugin)
+ plugin.logDebug("Decrypted", plugin_result)
+ result.extend(plugin_result)
+ result = uniqify(result)
+ pack_names = {}
+ urls = []
+ for p in result:
+ if isinstance(p, Package):
+ if in pack_names:
+ pack_names[].urls.extend(p.urls)
+ else:
+ pack_names[] = p
+ else:
+ urls.append(p)
+ if urls:
+"Decrypted %(count)d links into package %(name)s") % {"count": len(urls), "name":})
+ self.m.core.api.addFiles(, urls)
+ for p in pack_names.itervalues():
+ self.m.core.api.addPackage(, p.urls, pack.password)
+ if not result:
+"No links decrypted"))
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..0269b0660
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,223 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ See the GNU General Public License for more details.
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <>.
+ @author: RaNaN
+from Queue import Queue
+from time import sleep, time
+from traceback import print_exc
+from sys import exc_clear
+from pycurl import error
+from module.plugins.Base import Fail, Retry, Abort
+from module.plugins.Hoster import Reconnect, SkipDownload
+from import BadHeader
+from BaseThread import BaseThread
+class DownloadThread(BaseThread):
+ """thread for downloading files from 'real' hoster plugins"""
+ def __init__(self, manager):
+ """Constructor"""
+ BaseThread.__init__(self, manager)
+ self.queue = Queue() # job queue
+ = False
+ self.start()
+ def run(self):
+ """run method"""
+ pyfile = None
+ while True:
+ del pyfile
+ = self.queue.get()
+ pyfile =
+ if == "quit":
+ = False
+ self.m.threads.remove(self)
+ return True
+ try:
+ if not pyfile.hasPlugin(): continue
+ #this pyfile was deleted while queuing
+ pyfile.plugin.checkForSameFiles(starting=True)
+"Download starts: %s" %
+ # start download
+ self.core.addonManager.downloadPreparing(pyfile)
+ pyfile.plugin.preprocessing(self)
+"Download finished: %s") %
+ self.core.addonManager.downloadFinished(pyfile)
+ self.core.files.checkPackageFinished(pyfile)
+ except NotImplementedError:
+ self.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname)
+ pyfile.setStatus("failed")
+ pyfile.error = "Plugin does not work"
+ self.clean(pyfile)
+ continue
+ except Abort:
+ try:
+"Download aborted: %s") %
+ except:
+ pass
+ pyfile.setStatus("aborted")
+ self.clean(pyfile)
+ continue
+ except Reconnect:
+ self.queue.put(pyfile)
+ #pyfile.req.clearCookies()
+ while self.m.reconnecting.isSet():
+ sleep(0.5)
+ continue
+ except Retry, e:
+ reason = e.args[0]
+"Download restarted: %(name)s | %(msg)s") % {"name":, "msg": reason})
+ self.queue.put(pyfile)
+ continue
+ except Fail, e:
+ msg = e.args[0]
+ # TODO: activate former skipped downloads
+ if msg == "offline":
+ pyfile.setStatus("offline")
+ self.log.warning(_("Download is offline: %s") %
+ elif msg == "temp. offline":
+ pyfile.setStatus("temp. offline")
+ self.log.warning(_("Download is temporary offline: %s") %
+ else:
+ pyfile.setStatus("failed")
+ self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name":, "msg": msg})
+ pyfile.error = msg
+ self.core.addonManager.downloadFailed(pyfile)
+ self.clean(pyfile)
+ continue
+ except error, e:
+ if len(e.args) == 2:
+ code, msg = e.args
+ else:
+ code = 0
+ msg = e.args
+ self.log.debug("pycurl exception %s: %s" % (code, msg))
+ if code in (7, 18, 28, 52, 56):
+ self.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry."))
+ wait = time() + 60
+ pyfile.waitUntil = wait
+ pyfile.setStatus("waiting")
+ while time() < wait:
+ sleep(1)
+ if pyfile.abort:
+ break
+ if pyfile.abort:
+"Download aborted: %s") %
+ pyfile.setStatus("aborted")
+ self.clean(pyfile)
+ else:
+ self.queue.put(pyfile)
+ continue
+ else:
+ pyfile.setStatus("failed")
+ self.log.error("pycurl error %s: %s" % (code, msg))
+ if self.core.debug:
+ print_exc()
+ self.writeDebugReport(pyfile.plugin.__name__, pyfile)
+ self.core.addonManager.downloadFailed(pyfile)
+ self.clean(pyfile)
+ continue
+ except SkipDownload, e:
+ pyfile.setStatus("skipped")
+"Download skipped: %(name)s due to %(plugin)s")
+ % {"name":, "plugin": e.message})
+ self.clean(pyfile)
+ self.core.files.checkPackageFinished(pyfile)
+ = False
+ continue
+ except Exception, e:
+ if isinstance(e, BadHeader) and e.code == 500:
+ pyfile.setStatus("temp. offline")
+ self.log.warning(_("Download is temporary offline: %s") %
+ pyfile.error = _("Internal Server Error")
+ else:
+ pyfile.setStatus("failed")
+ self.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name":, "msg": str(e)})
+ pyfile.error = str(e)
+ if self.core.debug:
+ print_exc()
+ self.writeDebugReport(pyfile.plugin.__name__, pyfile)
+ self.core.addonManager.downloadFailed(pyfile)
+ self.clean(pyfile)
+ continue
+ finally:
+ pyfile.checkIfProcessed()
+ exc_clear()
+ #pyfile.plugin.req.clean()
+ = False
+ pyfile.finishIfDone()
+ def put(self, job):
+ """assign a job to the thread"""
+ self.queue.put(job)
+ def stop(self):
+ """stops the thread"""
+ self.put("quit")
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..a8a2c6e7e
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,168 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from time import time
+from traceback import print_exc
+from module.Api import LinkStatus
+from module.common.packagetools import parseNames
+from module.utils import has_method, accumulate
+from BaseThread import BaseThread
+class InfoThread(BaseThread):
+ def __init__(self, manager, data, pid=-1, rid=-1):
+ """Constructor"""
+ BaseThread.__init__(self, manager)
+ = data
+ = pid # package id
+ # [ .. (name, plugin) .. ]
+ self.rid = rid #result id
+ self.cache = [] #accumulated data
+ self.start()
+ def run(self):
+ """run method"""
+ plugins = accumulate(
+ crypter = {}
+ # filter out crypter plugins
+ for name in self.m.core.pluginManager.getPlugins("crypter"):
+ if name in plugins:
+ crypter[name] = plugins[name]
+ del plugins[name]
+ #directly write to database
+ if > -1:
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPluginModule(pluginname)
+ klass = self.m.core.pluginManager.getPluginClass(pluginname)
+ if has_method(klass, "getInfo"):
+ self.fetchForPlugin(pluginname, klass, urls, self.updateDB)
+ elif has_method(plugin, "getInfo"):
+ self.log.debug("Deprecated .getInfo() method on module level, use classmethod instead")
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
+ else: #post the results
+ for name, urls in crypter:
+ #attach container content
+ try:
+ data = self.decrypt(name, urls)
+ except:
+ print_exc()
+ self.m.log.error("Could not decrypt container.")
+ data = []
+ accumulate(data, plugins)
+ self.m.infoResults[self.rid] = {}
+ for pluginname, urls in plugins.iteritems():
+ plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
+ klass = getattr(plugin, pluginname)
+ if has_method(klass, "getInfo"):
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
+ #force to process cache
+ if self.cache:
+ self.updateResult(pluginname, [], True)
+ elif has_method(plugin, "getInfo"):
+ self.log.debug("Deprecated .getInfo() method on module level, use staticmethod instead")
+ self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
+ #force to process cache
+ if self.cache:
+ self.updateResult(pluginname, [], True)
+ else:
+ #generate default result
+ result = [(url, 0, 3, url) for url in urls]
+ self.updateResult(pluginname, result, True)
+ self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {}
+ self.m.timestamp = time() + 5 * 60
+ def updateDB(self, plugin, result):
+ self.m.core.files.updateFileInfo(result,
+ def updateResult(self, plugin, result, force=False):
+ #parse package name and generate result
+ #accumulate results
+ self.cache.extend(result)
+ if len(self.cache) >= 20 or force:
+ #used for package generating
+ tmp = [(name, (url, LinkStatus(name, plugin, "unknown", status, int(size))))
+ for name, size, status, url in self.cache]
+ data = parseNames(tmp)
+ result = {}
+ for k, v in data.iteritems():
+ for url, status in v:
+ status.packagename = k
+ result[url] = status
+ self.m.setInfoResults(self.rid, result)
+ self.cache = []
+ def updateCache(self, plugin, result):
+ self.cache.extend(result)
+ def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
+ try:
+ result = [] #result loaded from cache
+ process = [] #urls to process
+ for url in urls:
+ if url in self.m.infoCache:
+ result.append(self.m.infoCache[url])
+ else:
+ process.append(url)
+ if result:
+ self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
+ cb(pluginname, result)
+ if process:
+ self.m.log.debug("Run Info Fetching for %s" % pluginname)
+ for result in plugin.getInfo(process):
+ #result = [ .. (name, size, status, url) .. ]
+ if not type(result) == list: result = [result]
+ for res in result:
+ self.m.infoCache[res[3]] = res
+ cb(pluginname, result)
+ self.m.log.debug("Finished Info Fetching for %s" % pluginname)
+ except Exception, e:
+ self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
+ {"name": pluginname, "err": str(e)})
+ if self.m.core.debug:
+ print_exc()
+ # generate default results
+ if err:
+ result = [(url, 0, 3, url) for url in urls]
+ cb(pluginname, result)
+ def decrypt(self, plugin, urls):
+ self.m.log.debug("Pre decrypting %s" % plugin)
+ klass = self.m.core.pluginManager.loadClass("crypter", plugin)
+ # only decrypt files
+ if has_method(klass, "decryptFile"):
+ urls = klass.decrypt(urls)
+ data, crypter = self.m.core.pluginManager.parseUrls(urls)
+ return data
+ return []
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..e3407aac3
--- /dev/null
+++ b/module/threads/
@@ -0,0 +1,298 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright(c) 2008-2012 pyLoad Team
+# This file is part of pyLoad.
+# pyLoad is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# Subjected to the terms and conditions in LICENSE
+# @author: RaNaN
+from os.path import exists, join
+import re
+from subprocess import Popen
+from threading import Event, Lock
+from time import sleep, time
+from traceback import print_exc
+from random import choice
+import pycurl
+from module.datatypes import PyFile
+from import getURL
+from module.utils import lock, uniqify
+from module.utils.fs import free_space
+from DecrypterThread import DecrypterThread
+from DownloadThread import DownloadThread
+from InfoThread import InfoThread
+class ThreadManager:
+ """manages the download threads, assign jobs, reconnect etc"""
+ def __init__(self, core):
+ """Constructor"""
+ self.core = core
+ self.log = core.log
+ self.threads = [] # thread list
+ self.localThreads = [] #addon+decrypter threads
+ self.pause = True
+ self.reconnecting = Event()
+ self.reconnecting.clear()
+ self.downloaded = 0 #number of files downloaded since last cleanup
+ self.lock = Lock()
+ # some operations require to fetch url info from hoster, so we caching them so it wont be done twice
+ # contains a timestamp and will be purged after timeout
+ self.infoCache = {}
+ # pool of ids for online check
+ self.resultIDs = 0
+ # threads which are fetching hoster results
+ self.infoResults = {}
+ # timeout for cache purge
+ self.timestamp = 0
+ pycurl.global_init(pycurl.GLOBAL_DEFAULT)
+ for i in range(self.core.config.get("download", "max_downloads")):
+ self.createThread()
+ def createThread(self):
+ """create a download thread"""
+ thread = DownloadThread(self)
+ self.threads.append(thread)
+ def createInfoThread(self, data, pid):
+ """ start a thread which fetches online status and other info's """
+ self.timestamp = time() + 5 * 60
+ if data: InfoThread(self, data, pid)
+ @lock
+ def createResultThread(self, data):
+ """ creates a thread to fetch online status, returns result id """
+ self.timestamp = time() + 5 * 60
+ rid = self.resultIDs
+ self.resultIDs += 1
+ InfoThread(self, data, rid=rid)
+ return rid
+ @lock
+ def createDecryptThread(self, data, pid):
+ """ Start decrypting of entered data, all links in one package are accumulated to one thread."""
+ if data: DecrypterThread(self, data, pid)
+ @lock
+ def getInfoResult(self, rid):
+ """returns result and clears it"""
+ self.timestamp = time() + 5 * 60
+ if rid in self.infoResults:
+ data = self.infoResults[rid]
+ self.infoResults[rid] = {}
+ return data
+ else:
+ return {}
+ @lock
+ def setInfoResults(self, rid, result):
+ self.infoResults[rid].update(result)
+ def getActiveFiles(self):
+ active = [ for x in self.threads if and isinstance(, PyFile)]
+ for t in self.localThreads:
+ active.extend(t.getActiveFiles())
+ return active
+ def processingIds(self):
+ """get a id list of all pyfiles processed"""
+ return [ for x in self.getActiveFiles()]
+ def work(self):
+ """run all task which have to be done (this is for repetetive call by core)"""
+ try:
+ self.tryReconnect()
+ except Exception, e:
+ self.log.error(_("Reconnect Failed: %s") % str(e) )
+ self.reconnecting.clear()
+ if self.core.debug:
+ print_exc()
+ self.checkThreadCount()
+ try:
+ self.assignJob()
+ except Exception, e:
+ self.log.warning("Assign job error", e)
+ if self.core.debug:
+ print_exc()
+ sleep(0.5)
+ self.assignJob()
+ #it may be failed non critical so we try it again
+ if (self.infoCache or self.infoResults) and self.timestamp < time():
+ self.infoCache.clear()
+ self.infoResults.clear()
+ self.log.debug("Cleared Result cache")
+ def tryReconnect(self):
+ """checks if reconnect needed"""
+ if not (self.core.config["reconnect"]["activated"] and self.core.api.isTimeReconnect()):
+ return False
+ active = [ and for x in self.threads if]
+ if not (0 < active.count(True) == len(active)):
+ return False
+ if not exists(self.core.config['reconnect']['method']):
+ if exists(join(pypath, self.core.config['reconnect']['method'])):
+ self.core.config['reconnect']['method'] = join(pypath, self.core.config['reconnect']['method'])
+ else:
+ self.core.config["reconnect"]["activated"] = False
+ self.log.warning(_("Reconnect script not found!"))
+ return
+ self.reconnecting.set()
+ #Do reconnect
+"Starting reconnect"))
+ while [ for x in self.threads if].count(True) != 0:
+ sleep(0.25)
+ ip = self.getIP()
+ self.core.addonManager.beforeReconnecting(ip)
+ self.log.debug("Old IP: %s" % ip)
+ try:
+ reconn = Popen(self.core.config['reconnect']['method'], bufsize=-1, shell=True)#, stdout=subprocess.PIPE)
+ except:
+ self.log.warning(_("Failed executing reconnect script!"))
+ self.core.config["reconnect"]["activated"] = False
+ self.reconnecting.clear()
+ if self.core.debug:
+ print_exc()
+ return
+ reconn.wait()
+ sleep(1)
+ ip = self.getIP()
+ self.core.addonManager.afterReconnecting(ip)
+"Reconnected, new IP: %s") % ip)
+ self.reconnecting.clear()
+ def getIP(self):
+ """retrieve current ip"""
+ services = [("", "(\S+)"),
+ ("",".*Current IP Address: (\S+)</body>.*")]
+ ip = ""
+ for i in range(10):
+ try:
+ sv = choice(services)
+ ip = getURL(sv[0])
+ ip = re.match(sv[1], ip).group(1)
+ break
+ except:
+ ip = ""
+ sleep(1)
+ return ip
+ def checkThreadCount(self):
+ """checks if there is a need for increasing or reducing thread count"""
+ if len(self.threads) == self.core.config.get("download", "max_downloads"):
+ return True
+ elif len(self.threads) < self.core.config.get("download", "max_downloads"):
+ self.createThread()
+ else:
+ free = [x for x in self.threads if not]
+ if free:
+ free[0].put("quit")
+ def cleanPycurl(self):
+ """ make a global curl cleanup (currently unused) """
+ if self.processingIds():
+ return False
+ pycurl.global_cleanup()
+ pycurl.global_init(pycurl.GLOBAL_DEFAULT)
+ self.downloaded = 0
+ self.log.debug("Cleaned up pycurl")
+ return True
+ def assignJob(self):
+ """assign a job to a thread if possible"""
+ if self.pause or not self.core.api.isTimeDownload(): return
+ #if self.downloaded > 20:
+ # if not self.cleanPyCurl(): return
+ free = [x for x in self.threads if not]
+ inuse = [(, for x in self.threads if and]
+ inuse = [(x[0], x[1], len([y for y in self.threads if and == x[0]])) for x in inuse]
+ occ = tuple(sorted(uniqify([x[0] for x in inuse if 0 < x[1] <= x[2]])))
+ job = self.core.files.getJob(occ)
+ if job:
+ try:
+ job.initPlugin()
+ except Exception, e:
+ self.log.critical(str(e))
+ print_exc()
+ job.setStatus("failed")
+ job.error = str(e)
+ job.release()
+ return
+ spaceLeft = free_space(self.core.config["general"]["download_folder"]) / 1024 / 1024
+ if spaceLeft < self.core.config["general"]["min_free_space"]:
+ self.log.warning(_("Not enough space left on device"))
+ self.pause = True
+ if free and not self.pause:
+ thread = free[0]
+ #self.downloaded += 1
+ thread.put(job)
+ else:
+ #put job back
+ if occ not in self.core.files.jobCache:
+ self.core.files.jobCache[occ] = []
+ self.core.files.jobCache[occ].append(
+ def cleanup(self):
+ """do global cleanup, should be called when finished with pycurl"""
+ pycurl.global_cleanup()
diff --git a/module/threads/ b/module/threads/
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/module/threads/