diff options
Diffstat (limited to 'pyload/manager')
-rw-r--r-- | pyload/manager/Account.py | 196 | ||||
-rw-r--r-- | pyload/manager/Addon.py | 302 | ||||
-rw-r--r-- | pyload/manager/Captcha.py | 158 | ||||
-rw-r--r-- | pyload/manager/Event.py | 132 | ||||
-rw-r--r-- | pyload/manager/Plugin.py | 400 | ||||
-rw-r--r-- | pyload/manager/Remote.py | 85 | ||||
-rw-r--r-- | pyload/manager/Scheduler.py | 140 | ||||
-rw-r--r-- | pyload/manager/Thread.py | 313 | ||||
-rw-r--r-- | pyload/manager/__init__.py | 1 |
9 files changed, 1727 insertions, 0 deletions
diff --git a/pyload/manager/Account.py b/pyload/manager/Account.py new file mode 100644 index 000000000..b743ccabc --- /dev/null +++ b/pyload/manager/Account.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement + +import os +import shutil +import threading + +from pyload.manager.Event import AccountUpdateEvent +from pyload.utils import chmod, lock + + +ACC_VERSION = 1 + + +class AccountManager(object): + """manages all accounts""" + + def __init__(self, core): + """Constructor""" + + self.core = core + self.lock = threading.Lock() + + self.initPlugins() + self.saveAccounts() #: save to add categories to conf + + + def initPlugins(self): + self.accounts = {} #: key = ( plugin ) + self.plugins = {} + + self.initAccountPlugins() + self.loadAccounts() + + + def getAccountPlugin(self, plugin): + """get account instance for plugin or None if anonymous""" + try: + if plugin in self.accounts: + if plugin not in self.plugins: + klass = self.core.pluginManager.loadClass("accounts", plugin) + if klass: + self.plugins[plugin] = klass(self, self.accounts[plugin]) + else: #@NOTE: The account class no longer exists (blacklisted plugin). Skipping the account to avoid crash + raise + + return self.plugins[plugin] + else: + raise + except Exception: + return None + + + def getAccountPlugins(self): + """ get all account instances""" + + plugins = [] + for plugin in self.accounts.keys(): + plugins.append(self.getAccountPlugin(plugin)) + + return plugins + + + #---------------------------------------------------------------------- + + def loadAccounts(self): + """loads all accounts available""" + + try: + with open("accounts.conf", "a+") as f: + content = f.readlines() + version = content[0].split(":")[1].strip() if content else "" + + if not version or int(version) < ACC_VERSION: + shutil.copy("accounts.conf", "accounts.backup") + f.seek(0) + f.write("version: " + str(ACC_VERSION)) + + self.core.log.warning(_("Account settings deleted, due to new config format")) + return + + except IOError, e: + self.core.log.error(str(e)) + return + + plugin = "" + name = "" + + for line in content[1:]: + line = line.strip() + + if not line: + continue + if line.startswith("#"): + continue + if line.startswith("version"): + continue + + if line.endswith(":") and line.count(":") == 1: + plugin = line[:-1] + self.accounts[plugin] = {} + + elif line.startswith("@"): + try: + option = line[1:].split() + self.accounts[plugin][name]['options'][option[0]] = [] if len(option) < 2 else ([option[1]] if len(option) < 3 else option[1:]) + except Exception: + pass + + elif ":" in line: + name, sep, pw = line.partition(":") + self.accounts[plugin][name] = {"password": pw, "options": {}, "valid": True} + + + #---------------------------------------------------------------------- + + def saveAccounts(self): + """save all account information""" + + try: + with open("accounts.conf", "wb") as f: + f.write("version: " + str(ACC_VERSION) + "\n") + + for plugin, accounts in self.accounts.iteritems(): + f.write("\n") + f.write(plugin + ":\n") + + for name, data in accounts.iteritems(): + f.write("\n\t%s:%s\n" % (name, data['password']) ) + if data['options']: + for option, values in data['options'].iteritems(): + f.write("\t@%s %s\n" % (option, " ".join(values))) + + os.chmod(f.name, 0600) + + except Exception, e: + self.core.log.error(str(e)) + + + #---------------------------------------------------------------------- + + def initAccountPlugins(self): + """init names""" + for name in self.core.pluginManager.getAccountPlugins(): + self.accounts[name] = {} + + + @lock + def updateAccount(self, plugin, user, password=None, options={}): + """add or update account""" + if plugin in self.accounts: + p = self.getAccountPlugin(plugin) + updated = p.updateAccounts(user, password, options) + # since accounts is a ref in plugin self.accounts doesnt need to be updated here + + self.saveAccounts() + if updated: p.scheduleRefresh(user, force=False) + + + @lock + def removeAccount(self, plugin, user): + """remove account""" + + if plugin in self.accounts: + p = self.getAccountPlugin(plugin) + p.removeAccount(user) + + self.saveAccounts() + + + @lock + def getAccountInfos(self, force=True, refresh=False): + data = {} + + if refresh: + self.core.scheduler.addJob(0, self.core.accountManager.getAccountInfos) + force = False + + for p in self.accounts.keys(): + if self.accounts[p]: + p = self.getAccountPlugin(p) + if p: + data[p.__class__.__name__] = p.getAllAccounts(force) + else: #@NOTE: When an account has been skipped, p is None + data[p] = [] + else: + data[p] = [] + e = AccountUpdateEvent() + self.core.pullManager.addEvent(e) + return data + + + def sendChange(self): + e = AccountUpdateEvent() + self.core.pullManager.addEvent(e) diff --git a/pyload/manager/Addon.py b/pyload/manager/Addon.py new file mode 100644 index 000000000..a632111ea --- /dev/null +++ b/pyload/manager/Addon.py @@ -0,0 +1,302 @@ +# -*- coding: utf-8 -*- +# @author: RaNaN, mkaay +# @interface-version: 0.1 + +import __builtin__ + +import threading +import traceback +import types + +from pyload.Thread import AddonThread +from pyload.manager.Plugin import literal_eval +from pyload.utils import lock + + +class AddonManager(object): + """Manages addons, delegates and handles Events. + + Every plugin can define events, \ + but some very usefull events are called by the Core. + Contrary to overwriting addon methods you can use event listener, + which provides additional entry point in the control flow. + Only do very short tasks or use threads. + + **Known Events:** + Most addon methods exists as events. These are the additional known events. + + ======================= ============== ================================== + Name Arguments Description + ======================= ============== ================================== + download-preparing fid A download was just queued and will be prepared now. + download-start fid A plugin will immediately starts the download afterwards. + links-added links, pid Someone just added links, you are able to modify the links. + all_downloads-processed Every link was handled, pyload would idle afterwards. + all_downloads-finished Every download in queue is finished. + config-changed The config was changed via the api. + pluginConfigChanged The plugin config changed, due to api or internal process. + ======================= ============== ================================== + + | Notes: + | all_downloads-processed is *always* called before all_downloads-finished. + | config-changed is *always* called before pluginConfigChanged. + """ + + def __init__(self, core): + self.core = core + + __builtin__.addonManager = self #: needed to let addons register themself + + self.plugins = [] + self.pluginMap = {} + self.methods = {} #: dict of names and list of methods usable by rpc + + self.events = {} #: contains events + + # registering callback for config event + self.core.config.pluginCB = types.MethodType(self.dispatchEvent, "pluginConfigChanged", basestring) #@TODO: Rename event pluginConfigChanged + + self.addEvent("pluginConfigChanged", self.manageAddon) + + self.lock = threading.RLock() + self.createIndex() + + + def try_catch(func): + + + def new(*args): + try: + return func(*args) + except Exception, e: + args[0].core.log.error(_("Error executing addon: %s") % e) + if args[0].core.debug: + traceback.print_exc() + + return new + + + def addRPC(self, plugin, func, doc): + plugin = plugin.rpartition(".")[2] + doc = doc.strip() if doc else "" + + if plugin in self.methods: + self.methods[plugin][func] = doc + else: + self.methods[plugin] = {func: doc} + + + def callRPC(self, plugin, func, args, parse): + if not args: + args = () + if parse: + args = tuple([literal_eval(x) for x in args]) + plugin = self.pluginMap[plugin] + f = getattr(plugin, func) + return f(*args) + + + def createIndex(self): + plugins = [] + + for type in ("addon", "hook"): + active = [] + deactive = [] + for pluginname in getattr(self.core.pluginManager, "%sPlugins" % type): + try: + if self.core.config.getPlugin("%s_%s" % (pluginname, type), "activated"): + pluginClass = self.core.pluginManager.loadClass(type, pluginname) + if not pluginClass: + continue + + plugin = pluginClass(self.core, self) + plugins.append(plugin) + self.pluginMap[pluginClass.__name__] = plugin + if plugin.isActivated(): + active.append(pluginClass.__name__) + else: + deactive.append(pluginname) + + except Exception: + self.core.log.warning(_("Failed activating %(name)s") % {"name": pluginname}) + if self.core.debug: + traceback.print_exc() + + self.core.log.info(_("Activated %ss: %s") % (type, ", ".join(sorted(active)))) + self.core.log.info(_("Deactivated %ss: %s") % (type, ", ".join(sorted(deactive)))) + + self.plugins = plugins + + + def manageAddon(self, plugin, name, value): + if name == "activated" and value: + self.activateAddon(plugin) + + elif name == "activated" and not value: + self.deactivateAddon(plugin) + + + def activateAddon(self, pluginname): + # check if already loaded + for inst in self.plugins: + if inst.__class__.__name__ == pluginname: + return + + pluginClass = self.core.pluginManager.loadClass("addon", pluginname) + + if not pluginClass: + return + + self.core.log.debug("Activate addon: %s" % pluginname) + + addon = pluginClass(self.core, self) + self.plugins.append(addon) + self.pluginMap[pluginClass.__name__] = addon + + addon.activate() + + + def deactivateAddon(self, pluginname): + for plugin in self.plugins: + if plugin.__class__.__name__ == pluginname: + addon = plugin + break + else: + return + + self.core.log.debug("Deactivate addon: %s" % pluginname) + + addon.deactivate() + + # remove periodic call + self.core.log.debug("Removed callback: %s" % self.core.scheduler.removeJob(addon.cb)) + + self.plugins.remove(addon) + del self.pluginMap[addon.__class__.__name__] + + + @try_catch + def coreReady(self): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.activate() + + self.dispatchEvent("addon-start") + + + @try_catch + def coreExiting(self): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.exit() + + self.dispatchEvent("addon-exit") + + + @lock + def downloadPreparing(self, pyfile): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.downloadPreparing(pyfile) + + self.dispatchEvent("download-preparing", pyfile) + + + @lock + def downloadFinished(self, pyfile): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.downloadFinished(pyfile) + + self.dispatchEvent("download-finished", pyfile) + + + @lock + @try_catch + def downloadFailed(self, pyfile): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.downloadFailed(pyfile) + + self.dispatchEvent("download-failed", pyfile) + + + @lock + def packageFinished(self, package): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.packageFinished(package) + + self.dispatchEvent("package-finished", package) + + + @lock + def beforeReconnecting(self, ip): + for plugin in self.plugins: + plugin.beforeReconnecting(ip) + + self.dispatchEvent("beforeReconnecting", ip) + + + @lock + def afterReconnecting(self, ip): + for plugin in self.plugins: + if plugin.isActivated(): + plugin.afterReconnecting(ip) + + self.dispatchEvent("afterReconnecting", ip) + + + def startThread(self, function, *args, **kwargs): + return AddonThread(self.core.threadManager, function, args, kwargs) + + + def activePlugins(self): + """ returns all active plugins """ + return [x for x in self.plugins if x.isActivated()] + + + def getAllInfo(self): + """returns info stored by addon plugins""" + info = {} + for name, plugin in self.pluginMap.iteritems(): + if plugin.info: + # copy and convert so str + info[name] = dict( + [(x, str(y) if not isinstance(y, basestring) else y) for x, y in plugin.info.iteritems()]) + return info + + + def getInfo(self, plugin): + info = {} + if plugin in self.pluginMap and self.pluginMap[plugin].info: + info = dict((x, str(y) if not isinstance(y, basestring) else y) + for x, y in self.pluginMap[plugin].info.iteritems()) + return info + + + def addEvent(self, event, func): + """Adds an event listener for event name""" + if event in self.events: + self.events[event].append(func) + else: + self.events[event] = [func] + + + def removeEvent(self, event, func): + """removes previously added event listener""" + if event in self.events: + self.events[event].remove(func) + + + def dispatchEvent(self, event, *args): + """dispatches event with args""" + if event in self.events: + for f in self.events[event]: + try: + f(*args) + except Exception, e: + self.core.log.warning("Error calling event handler %s: %s, %s, %s" + % (event, f, args, str(e))) + if self.core.debug: + traceback.print_exc() diff --git a/pyload/manager/Captcha.py b/pyload/manager/Captcha.py new file mode 100644 index 000000000..271e6122b --- /dev/null +++ b/pyload/manager/Captcha.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# @author: RaNaN, mkaay + +import threading +import time +import traceback + +from pyload.utils import encode + + +class CaptchaManager(object): + + def __init__(self, core): + self.lock = threading.Lock() + self.core = core + self.tasks = [] #: task store, for outgoing tasks only + self.ids = 0 #: only for internal purpose + + + def newTask(self, img, format, file, result_type): + task = CaptchaTask(self.ids, img, format, file, result_type) + self.ids += 1 + return task + + + def removeTask(self, task): + self.lock.acquire() + if task in self.tasks: + self.tasks.remove(task) + self.lock.release() + + + def getTask(self): + self.lock.acquire() + for task in self.tasks: + if task.status in ("waiting", "shared-user"): + self.lock.release() + return task + self.lock.release() + return None + + + def getTaskByID(self, tid): + self.lock.acquire() + for task in self.tasks: + if task.id == str(tid): #: task ids are strings + self.lock.release() + return task + self.lock.release() + return None + + + def handleCaptcha(self, task, timeout=50): + cli = self.core.isClientConnected() + + if cli: #: client connected -> should solve the captcha + task.setWaiting(timeout) #: wait 50 sec for response + + for plugin in self.core.addonManager.activePlugins(): + try: + plugin.captchaTask(task) + except Exception: + if self.core.debug: + traceback.print_exc() + + if task.handler or cli: #: the captcha was handled + self.tasks.append(task) + return True + task.error = _("No Client connected for captcha decrypting") + return False + + +class CaptchaTask(object): + + def __init__(self, id, img, format, file, result_type='textual'): + self.id = str(id) + self.captchaImg = img + self.captchaFormat = format + self.captchaFile = file + self.captchaResultType = result_type + self.handler = [] #: the hook plugins that will take care of the solution + self.result = None + self.waitUntil = None + self.error = None #: error message + self.status = "init" + self.data = {} #: handler can store data here + + + def getCaptcha(self): + return self.captchaImg, self.captchaFormat, self.captchaResultType + + + def setResult(self, text): + if self.isTextual(): + self.result = text + if self.isPositional(): + try: + parts = text.split(',') + self.result = (int(parts[0]), int(parts[1])) + except Exception: + self.result = None + + + def getResult(self): + return encode(self.result) + + + def getStatus(self): + return self.status + + + def setWaiting(self, sec): + """ let the captcha wait secs for the solution """ + self.waitUntil = max(time.time() + sec, self.waitUntil) + self.status = "waiting" + + + def isWaiting(self): + if self.result or self.error or self.timedOut(): + return False + else: + return True + + + def isTextual(self): + """ returns if text is written on the captcha """ + return self.captchaResultType == 'textual' + + + def isPositional(self): + """ returns if user have to click a specific region on the captcha """ + return self.captchaResultType == 'positional' + + + def setWatingForUser(self, exclusive): + if exclusive: + self.status = "user" + else: + self.status = "shared-user" + + + def timedOut(self): + return time.time() > self.waitUntil + + + def invalid(self): + """ indicates the captcha was not correct """ + for x in self.handler: + x.captchaInvalid(self) + + + def correct(self): + for x in self.handler: + x.captchaCorrect(self) + + + def __str__(self): + return "<CaptchaTask '%s'>" % self.id diff --git a/pyload/manager/Event.py b/pyload/manager/Event.py new file mode 100644 index 000000000..919835984 --- /dev/null +++ b/pyload/manager/Event.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# @author: mkaay + +import time + +from pyload.utils import uniqify + + +class PullManager(object): + + def __init__(self, core): + self.core = core + self.clients = [] + + + def newClient(self, uuid): + self.clients.append(Client(uuid)) + + + def clean(self): + for n, client in enumerate(self.clients): + if client.lastActive + 30 < time.time(): + del self.clients[n] + + + def getEvents(self, uuid): + events = [] + validUuid = False + for client in self.clients: + if client.uuid == uuid: + client.lastActive = time.time() + validUuid = True + while client.newEvents(): + events.append(client.popEvent().toList()) + break + if not validUuid: + self.newClient(uuid) + events = [ReloadAllEvent("queue").toList(), ReloadAllEvent("collector").toList()] + return uniqify(events) + + + def addEvent(self, event): + for client in self.clients: + client.addEvent(event) + + +class Client(object): + + def __init__(self, uuid): + self.uuid = uuid + self.lastActive = time.time() + self.events = [] + + + def newEvents(self): + return len(self.events) > 0 + + + def popEvent(self): + if not len(self.events): + return None + return self.events.pop(0) + + + def addEvent(self, event): + self.events.append(event) + + +class UpdateEvent(object): + + def __init__(self, itype, iid, destination): + assert itype == "pack" or itype == "file" + assert destination == "queue" or destination == "collector" + self.type = itype + self.id = iid + self.destination = destination + + + def toList(self): + return ["update", self.destination, self.type, self.id] + + +class RemoveEvent(object): + + def __init__(self, itype, iid, destination): + assert itype == "pack" or itype == "file" + assert destination == "queue" or destination == "collector" + self.type = itype + self.id = iid + self.destination = destination + + + def toList(self): + return ["remove", self.destination, self.type, self.id] + + +class InsertEvent(object): + + def __init__(self, itype, iid, after, destination): + assert itype == "pack" or itype == "file" + assert destination == "queue" or destination == "collector" + self.type = itype + self.id = iid + self.after = after + self.destination = destination + + + def toList(self): + return ["insert", self.destination, self.type, self.id, self.after] + + +class ReloadAllEvent(object): + + def __init__(self, destination): + assert destination == "queue" or destination == "collector" + self.destination = destination + + + def toList(self): + return ["reload", self.destination] + + +class AccountUpdateEvent(object): + + def toList(self): + return ["account"] + + +class ConfigUpdateEvent(object): + + def toList(self): + return ["config"] diff --git a/pyload/manager/Plugin.py b/pyload/manager/Plugin.py new file mode 100644 index 000000000..03bf9cacc --- /dev/null +++ b/pyload/manager/Plugin.py @@ -0,0 +1,400 @@ +# -*- coding: utf-8 -*- + +from __future__ import with_statement + +import os +import re +import sys +import traceback +import urllib + +import SafeEval + + +class PluginManager(object): + ROOT = "pyload.plugin." + USERROOT = "userplugins." + TYPES = ["account", "addon", "container", "crypter", "extractor", "hook", "hoster", "internal", "ocr"] + + PATTERN = re.compile(r'__pattern\s*=\s*u?r("|\')([^"\']+)') + VERSION = re.compile(r'__version\s*=\s*("|\')([\d.]+)') + CONFIG = re.compile(r'__config\s*=\s*\[([^\]]+)', re.M) + DESC = re.compile(r'__description\s*=\s*("|"""|\')([^"\']+)') + + + def __init__(self, core): + self.core = core + + self.plugins = {} + self.createIndex() + + # register for import addon + sys.meta_path.append(self) + + + def loadTypes(self): + rootdir = os.path.join(pypath, "pyload", "plugin") + userdir = "userplugins" + + types = set().union(*[[d for d in os.listdir(p) if os.path.isdir(os.path.join(p, d))] + for p in (rootdir, userdir) if os.path.exists(p)]) + + if not types: + self.core.log.critical(_("No plugins found!")) + + self.TYPES = list(set(self.TYPES) | types) + + + def createIndex(self): + """create information for all plugins available""" + + sys.path.append(os.path.abspath("")) + + self.loadTypes() + + configs = [] + + for type in self.TYPES: + self.plugins[type] = self.parse(type) + setattr(self, "%sPlugins" % type, self.plugins[type]) + configs.extend("%s_%s" % (p, type) for p in self.plugins[type]) + + self.core.config.removeDeletedPlugins(configs) + + self.core.log.debug("Created index of plugins") + + + def parse(self, folder, rootplugins={}): + """ + returns dict with information + home contains parsed plugins from pyload. + """ + + plugins = {} + + if rootplugins: + try: + pfolder = os.path.join("userplugins", folder) + if not os.path.exists(pfolder): + os.makedirs(pfolder) + + for ifile in (os.path.join("userplugins", "__init__.py"), + os.path.join(pfolder, "__init__.py")): + if not os.path.exists(ifile): + f = open(ifile, "wb") + f.close() + + except IOError, e: + self.core.log.critical(str(e)) + return rootplugins + + else: + pfolder = os.path.join(pypath, "pyload", "plugin", folder) + + for f in os.listdir(pfolder): + if os.path.isfile(os.path.join(pfolder, f)) and f.endswith(".py") and not f.startswith("_"): + + try: + with open(os.path.join(pfolder, f)) as data: + content = data.read() + + except IOError, e: + self.core.log.error(str(e)) + continue + + name = f[:-3] + if name[-1] == ".": + name = name[:-4] + + if not re.search("class\\s+%s\\(" % name, content): + self.core.log.error(_("invalid classname: %s ignored") % os.path.join(pfolder, f)) + + version = self.VERSION.findall(content) + if version: + version = float(version[0][1]) + else: + version = 0 + + if rootplugins and name in rootplugins: + if rootplugins[name]['version'] >= version: + continue + + plugins[name] = {} + plugins[name]['version'] = version + + module = f.replace(".pyc", "").replace(".py", "") + + # the plugin is loaded from user directory + plugins[name]['user'] = bool(rootplugins) + plugins[name]['name'] = module + + pattern = self.PATTERN.findall(content) + + if pattern: + pattern = pattern[0][1] + + try: + regexp = re.compile(pattern) + except Exception: + self.core.log.error(_("%s has a invalid pattern") % name) + pattern = r'^unmatchable$' + regexp = re.compile(pattern) + + plugins[name]['pattern'] = pattern + plugins[name]['re'] = regexp + + # internals have no config + if folder == "internal": + self.core.config.deleteConfig("internal") + continue + + config = self.CONFIG.findall(content) + if config: + try: + config = SafeEval.const_eval(config[0].strip().replace("\n", "").replace("\r", "")) + desc = self.DESC.findall(content) + desc = desc[0][1] if desc else "" + + if type(config[0]) == tuple: + config = [list(x) for x in config] + else: + config = [list(config)] + + if folder not in ("account", "internal") and not [True for item in config if item[0] == "activated"]: + config.insert(0, ["activated", "bool", "Activated", not folder in ("addon", "hook")]) + + self.core.config.addPluginConfig("%s_%s" % (name, folder), config, desc) + except Exception: + self.core.log.error("Invalid config in %s: %s" % (name, config)) + + elif folder in ("addon", "hook"): #: force config creation + desc = self.DESC.findall(content) + desc = desc[0][1] if desc else "" + config = (["activated", "bool", "Activated", False],) + + try: + self.core.config.addPluginConfig("%s_%s" % (name, folder), config, desc) + except Exception: + self.core.log.error("Invalid config in %s: %s" % (name, config)) + + if not rootplugins and plugins: #: Double check + plugins.update(self.parse(folder, plugins)) + + return plugins + + + def parseUrls(self, urls): + """parse plugins for given list of urls""" + + last = None + res = [] #: tupels of (url, plugintype, pluginname) + + for url in urls: + if type(url) not in (str, unicode, buffer): + continue + + url = urllib.unquote(url) + + if last and last[2]['re'].match(url): + res.append((url, last[0], last[1])) + continue + + for plugintype in self.TYPES: + m = None + for name, plugin in self.plugins[plugintype].iteritems(): + try: + if 'pattern' in plugin: + m = plugin['re'].match(url) + + except KeyError: + self.core.log.error(_("Plugin [%(type)s] %(name)s skipped due broken pattern") + % {'name': plugin['name'], 'type': plugintype}) + + if m: + res.append((url, plugintype, name)) + last = (plugintype, name, plugin) + break + if m: + break + else: + res.append((url, "internal", "BasePlugin")) + print res + return res + + + def findPlugin(self, type, name): + if isinstance(type, tuple): + for typ in type: + if name in self.plugins[typ]: + return (self.plugins[typ][name], typ) + + if isinstance(type, tuple) or type not in self.plugins or name not in self.plugins[type]: + self.core.log.warning(_("Plugin [%(type)s] %(name)s not found | Using plugin: [internal] BasePlugin") + % {'name': name, 'type': type}) + return self.internalPlugins['BasePlugin'] + + else: + return self.plugins[type][name] + + + def getPlugin(self, type, name, original=False): + """return plugin module from hoster|decrypter|container""" + plugin = self.findPlugin(type, name) + + if plugin is None: + return {} + + if "new_module" in plugin and not original: + return plugin['new_module'] + else: + return self.loadModule(type, name) + + + def getPluginName(self, type, name): + """ used to obtain new name if other plugin was injected""" + plugin = self.findPlugin(type, name) + + if plugin is None: + return "" + + if "new_name" in plugin: + return plugin['new_name'] + + return name + + + def loadModule(self, type, name): + """ Returns loaded module for plugin + + :param type: plugin type, subfolder of pyload.plugins + :param name: + """ + plugins = self.plugins[type] + + if name in plugins: + if "module" in plugins[name]: + return plugins[name]['module'] + + try: + module = __import__(self.ROOT + "%s.%s" % (type, plugins[name]['name']), globals(), locals(), + plugins[name]['name']) + + except Exception, e: + self.core.log.error(_("Error importing plugin: [%(type)s] %(name)s (v%(version).2f) | %(errmsg)s") + % {'name': name, 'type': type, 'version': plugins[name]['version'], "errmsg": str(e)}) + if self.core.debug: + traceback.print_exc() + + else: + plugins[name]['module'] = module # : cache import, maybe unneeded + + self.core.log.debug(_("Loaded plugin: [%(type)s] %(name)s (v%(version).2f)") + % {'name': name, 'type': type, 'version': plugins[name]['version']}) + return module + + + def loadClass(self, type, name): + """Returns the class of a plugin with the same name""" + module = self.loadModule(type, name) + if module: + return getattr(module, name) + else: + return None + + + def getAccountPlugins(self): + """return list of account plugin names""" + return self.accountPlugins.keys() + + + def find_module(self, fullname, path=None): + # redirecting imports if necesarry + if fullname.startswith(self.ROOT) or fullname.startswith(self.USERROOT): #: seperate pyload plugins + if fullname.startswith(self.USERROOT): user = 1 + else: user = 0 #: used as bool and int + + split = fullname.split(".") + if len(split) != 4 - user: + return + type, name = split[2 - user:4 - user] + + if type in self.plugins and name in self.plugins[type]: + # userplugin is a newer version + if not user and self.plugins[type][name]['user']: + return self + # imported from userdir, but pyloads is newer + if user and not self.plugins[type][name]['user']: + return self + + + def load_module(self, name, replace=True): + if name not in sys.modules: #: could be already in modules + if replace: + if self.ROOT in name: + newname = name.replace(self.ROOT, self.USERROOT) + else: + newname = name.replace(self.USERROOT, self.ROOT) + else: + newname = name + + base, plugin = newname.rsplit(".", 1) + + self.core.log.debug("Redirected import %s -> %s" % (name, newname)) + + module = __import__(newname, globals(), locals(), [plugin]) + # inject under new an old name + sys.modules[name] = module + sys.modules[newname] = module + + return sys.modules[name] + + + def reloadPlugins(self, type_plugins): + """ reload and reindex plugins """ + if not type_plugins: + return None + + self.core.log.debug("Request reload of plugins: %s" % type_plugins) + + reloaded = [] + + as_dict = {} + for t, n in type_plugins: + if t in as_dict: + as_dict[t].append(n) + else: + as_dict[t] = [n] + + for type in as_dict.iterkeys(): + if type in ("addon", "internal"): # : do not reload them because would cause to much side effects + self.core.log.debug("Skipping reload for plugins from type: %(type)s" % {'type': type}) + continue + + for plugin in as_dict[type]: + if plugin in self.plugins[type] and "module" in self.plugins[type][plugin]: + self.core.log.debug(_("Reloading plugin: [%(type)s] %(name)s") % {'name': plugin, 'type': type}) + + try: + reload(self.plugins[type][plugin]['module']) + + except Exception, e: + self.core.log.error(_("Error when reloading plugin: [%(type)s] %(name)s") % {'name': plugin, 'type': type}, e) + continue + + else: + reloaded.append((type, plugin)) + + # index creation + self.plugins[type] = self.parse(type) + setattr(self, "%sPlugins" % type, self.plugins[type]) + + if "account" in as_dict: #: accounts needs to be reloaded + self.core.accountManager.initPlugins() + self.core.scheduler.addJob(0, self.core.accountManager.getAccountInfos) + + return reloaded #: return a list of the plugins successfully reloaded + + + def reloadPlugin(self, type_plugin): + """ reload and reindex ONE plugin """ + return bool(self.reloadPlugins(type_plugin)) diff --git a/pyload/manager/Remote.py b/pyload/manager/Remote.py new file mode 100644 index 000000000..6023cdcfd --- /dev/null +++ b/pyload/manager/Remote.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# @author: RaNaN + +import threading +import traceback + + +class BackendBase(threading.Thread): + + def __init__(self, manager): + threading.Thread.__init__(self) + self.m = manager + self.core = manager.core + self.enabled = True + self.running = False + + + def run(self): + self.running = True + try: + self.serve() + except Exception, e: + self.core.log.error(_("Remote backend error: %s") % e) + if self.core.debug: + traceback.print_exc() + finally: + self.running = False + + + def setup(self, host, port): + pass + + + def checkDeps(self): + return True + + + def serve(self): + pass + + + def shutdown(self): + pass + + + def stop(self): + self.enabled = False #: set flag and call shutdowm message, so thread can react + self.shutdown() + + +class RemoteManager(object): + available = [] + + + def __init__(self, core): + self.core = core + self.backends = [] + + if self.core.remote: + self.available.append("ThriftBackend") + # else: + # self.available.append("SocketBackend") + + + def startBackends(self): + host = self.core.config.get("remote", "listenaddr") + port = self.core.config.get("remote", "port") + + for b in self.available: + klass = getattr(__import__("pyload.remote.%s" % b, globals(), locals(), [b], -1), b) + backend = klass(self) + if not backend.checkDeps(): + continue + try: + backend.setup(host, port) + self.core.log.info(_("Starting %(name)s: %(addr)s:%(port)s") % {"name": b, "addr": host, "port": port}) + except Exception, e: + self.core.log.error(_("Failed loading backend %(name)s | %(error)s") % {"name": b, "error": str(e)}) + if self.core.debug: + traceback.print_exc() + else: + backend.start() + self.backends.append(backend) + + port += 1 diff --git a/pyload/manager/Scheduler.py b/pyload/manager/Scheduler.py new file mode 100644 index 000000000..b82768aff --- /dev/null +++ b/pyload/manager/Scheduler.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# @author: mkaay + +import heapq +import threading +import time + + +class AlreadyCalled(Exception): + pass + + +class Deferred(object): + + def __init__(self): + self.call = [] + self.result = () + + + def addCallback(self, f, *cargs, **ckwargs): + self.call.append((f, cargs, ckwargs)) + + + def callback(self, *args, **kwargs): + if self.result: + raise AlreadyCalled + self.result = (args, kwargs) + for f, cargs, ckwargs in self.call: + args += tuple(cargs) + kwargs.update(ckwargs) + f(*args ** kwargs) + + +class Scheduler(object): + + def __init__(self, core): + self.core = core + + self.queue = PriorityQueue() + + + def addJob(self, t, call, args=[], kwargs={}, threaded=True): + d = Deferred() + t += time.time() + j = Job(t, call, args, kwargs, d, threaded) + self.queue.put((t, j)) + return d + + + def removeJob(self, d): + """ + :param d: defered object + :return: if job was deleted + """ + index = -1 + + for i, j in enumerate(self.queue): + if j[1].deferred == d: + index = i + + if index >= 0: + del self.queue[index] + return True + + return False + + + def work(self): + while True: + t, j = self.queue.get() + if not j: + break + else: + if t <= time.time(): + j.start() + else: + self.queue.put((t, j)) + break + + +class Job(object): + + def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True): + self.time = float(time) + self.call = call + self.args = args + self.kwargs = kwargs + self.deferred = deferred + self.threaded = threaded + + + def run(self): + ret = self.call(*self.args, **self.kwargs) + if self.deferred is None: + return + else: + self.deferred.callback(ret) + + + def start(self): + if self.threaded: + t = threading.Thread(target=self.run) + t.setDaemon(True) + t.start() + else: + self.run() + + +class PriorityQueue(object): + """ a non blocking priority queue """ + + def __init__(self): + self.queue = [] + self.lock = threading.Lock() + + + def __iter__(self): + return iter(self.queue) + + + def __delitem__(self, key): + del self.queue[key] + + + def put(self, element): + self.lock.acquire() + heapq.heappush(self.queue, element) + self.lock.release() + + + def get(self): + """ return element or None """ + self.lock.acquire() + try: + el = heapq.heappop(self.queue) + return el + except IndexError: + return None, None + finally: + self.lock.release() diff --git a/pyload/manager/Thread.py b/pyload/manager/Thread.py new file mode 100644 index 000000000..b80c5594a --- /dev/null +++ b/pyload/manager/Thread.py @@ -0,0 +1,313 @@ +# -*- coding: utf-8 -*- +# @author: RaNaN + +import os +import random +import re +import subprocess +import threading +import time +import traceback + +import pycurl + +from pyload.Datatype import PyFile +from pyload.Thread import DecrypterThread, DownloadThread, InfoThread +from pyload.network.RequestFactory import getURL +from pyload.utils import freeSpace, lock + + +class ThreadManager(object): + """manages the download threads, assign jobs, reconnect etc""" + + def __init__(self, core): + """Constructor""" + self.core = core + + self.threads = [] #: thread list + self.localThreads = [] #: addon+decrypter threads + + self.pause = True + + self.reconnecting = threading.Event() + self.reconnecting.clear() + self.downloaded = 0 #: number of files downloaded since last cleanup + + self.lock = threading.Lock() + + # some operations require to fetch url info from hoster, so we caching them so it wont be done twice + # contains a timestamp and will be purged after timeout + self.infoCache = {} + + # pool of ids for online check + self.resultIDs = 0 + + # threads which are fetching hoster results + self.infoResults = {} + # timeout for cache purge + self.timestamp = 0 + + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + + for _i in xrange(0, self.core.config.get("download", "max_downloads")): + self.createThread() + + + def createThread(self): + """create a download thread""" + + thread = DownloadThread(self) + self.threads.append(thread) + + + def createInfoThread(self, data, pid): + """ + start a thread whichs fetches online status and other infos + data = [ .. () .. ] + """ + self.timestamp = time.time() + 5 * 60 + + InfoThread(self, data, pid) + + + @lock + def createResultThread(self, data, add=False): + """ creates a thread to fetch online status, returns result id """ + self.timestamp = time.time() + 5 * 60 + + rid = self.resultIDs + self.resultIDs += 1 + + InfoThread(self, data, rid=rid, add=add) + + return rid + + + @lock + def getInfoResult(self, rid): + """returns result and clears it""" + self.timestamp = time.time() + 5 * 60 + + if rid in self.infoResults: + data = self.infoResults[rid] + self.infoResults[rid] = {} + return data + else: + return {} + + + @lock + def setInfoResults(self, rid, result): + self.infoResults[rid].update(result) + + + def getActiveFiles(self): + active = [x.active for x in self.threads if x.active and isinstance(x.active, PyFile)] + + for t in self.localThreads: + active.extend(t.getActiveFiles()) + + return active + + + def processingIds(self): + """get a id list of all pyfiles processed""" + return [x.id for x in self.getActiveFiles()] + + + def work(self): + """run all task which have to be done (this is for repetivive call by core)""" + try: + self.tryReconnect() + except Exception, e: + self.core.log.error(_("Reconnect Failed: %s") % str(e)) + self.reconnecting.clear() + if self.core.debug: + traceback.print_exc() + self.checkThreadCount() + + try: + self.assignJob() + except Exception, e: + self.core.log.warning("Assign job error", e) + if self.core.debug: + traceback.print_exc() + + time.sleep(0.5) + self.assignJob() + # it may be failed non critical so we try it again + + if (self.infoCache or self.infoResults) and self.timestamp < time.time(): + self.infoCache.clear() + self.infoResults.clear() + self.core.log.debug("Cleared Result cache") + + + #-------------------------------------------------------------------------- + + def tryReconnect(self): + """checks if reconnect needed""" + + if not (self.core.config.get("reconnect", "activated") and self.core.api.isTimeReconnect()): + return False + + active = [x.active.plugin.wantReconnect and x.active.plugin.waiting for x in self.threads if x.active] + + if not (0 < active.count(True) == len(active)): + return False + + if not os.path.exists(self.core.config.get("reconnect", "method")): + if os.path.exists(os.path.join(pypath, self.core.config.get("reconnect", "method"))): + self.core.config.set("reconnect", "method", os.path.join(pypath, self.core.config.get("reconnect", "method"))) + else: + self.core.config.set("reconnect", "activated", False) + self.core.log.warning(_("Reconnect script not found!")) + return + + self.reconnecting.set() + + # Do reconnect + self.core.log.info(_("Starting reconnect")) + + while [x.active.plugin.waiting for x in self.threads if x.active].count(True) != 0: + time.sleep(0.25) + + ip = self.getIP() + + self.core.addonManager.beforeReconnecting(ip) + + self.core.log.debug("Old IP: %s" % ip) + + try: + reconn = subprocess.Popen(self.core.config.get("reconnect", "method"), bufsize=-1, shell=True) # , stdout=subprocess.PIPE) + except Exception: + self.core.log.warning(_("Failed executing reconnect script!")) + self.core.config.set("reconnect", "activated", False) + self.reconnecting.clear() + if self.core.debug: + traceback.print_exc() + return + + reconn.wait() + time.sleep(1) + ip = self.getIP() + self.core.addonManager.afterReconnecting(ip) + + self.core.log.info(_("Reconnected, new IP: %s") % ip) + + self.reconnecting.clear() + + + def getIP(self): + """retrieve current ip""" + services = [("http://automation.whatismyip.com/n09230945.asp", "(\S+)"), + ("http://checkip.dyndns.org/", ".*Current IP Address: (\S+)</body>.*")] + + ip = "" + for _i in xrange(10): + try: + sv = random.choice(services) + ip = getURL(sv[0]) + ip = re.match(sv[1], ip).group(1) + break + except Exception: + ip = "" + time.sleep(1) + + return ip + + + #-------------------------------------------------------------------------- + + def checkThreadCount(self): + """checks if there are need for increasing or reducing thread count""" + + if len(self.threads) == self.core.config.get("download", "max_downloads"): + return True + elif len(self.threads) < self.core.config.get("download", "max_downloads"): + self.createThread() + else: + free = [x for x in self.threads if not x.active] + if free: + free[0].put("quit") + + + def cleanPycurl(self): + """ make a global curl cleanup (currently ununused) """ + if self.processingIds(): + return False + pycurl.global_cleanup() + pycurl.global_init(pycurl.GLOBAL_DEFAULT) + self.downloaded = 0 + self.core.log.debug("Cleaned up pycurl") + return True + + + #-------------------------------------------------------------------------- + + def assignJob(self): + """assing a job to a thread if possible""" + + if self.pause or not self.core.api.isTimeDownload(): + return + + # if self.downloaded > 20: + # if not self.cleanPyCurl(): + return + + free = [x for x in self.threads if not x.active] + + inuse = set([((x.active.plugintype, x.active.pluginname), self.getLimit(x)) for x in self.threads if x.active and isinstance(x.active, PyFile) and x.active.hasPlugin() and x.active.plugin.account]) + inuse = map(lambda x: ('.'.join(x[0]), x[1], len([y for y in self.threads if y.active and isinstance(y.active, PyFile) and y.active.plugintype == x[0][0] and y.active.pluginname == x[0][1]])), inuse) + onlimit = [x[0] for x in inuse if x[1] > 0 and x[2] >= x[1]] + + occ = [x.active.plugintype + '.' + x.active.pluginname for x in self.threads if x.active and isinstance(x.active, PyFile) and x.active.hasPlugin() and not x.active.plugin.multiDL] + onlimit + + occ.sort() + occ = tuple(set(occ)) + job = self.core.files.getJob(occ) + if job: + try: + job.initPlugin() + except Exception, e: + self.core.log.critical(str(e)) + traceback.print_exc() + job.setStatus("failed") + job.error = str(e) + job.release() + return + + if job.plugin.getPluginType() == "hoster": + spaceLeft = freeSpace(self.core.config.get("general", "download_folder")) / 1024 / 1024 + if spaceLeft < self.core.config.get("general", "min_free_space"): + self.core.log.warning(_("Not enough space left on device")) + self.pause = True + + if free and not self.pause: + thread = free[0] + # self.downloaded += 1 + + thread.put(job) + else: + # put job back + if occ not in self.core.files.jobCache: + self.core.files.jobCache[occ] = [] + self.core.files.jobCache[occ].append(job.id) + + # check for decrypt jobs + job = self.core.files.getDecryptJob() + if job: + job.initPlugin() + thread = DecrypterThread(self, job) + else: + thread = DecrypterThread(self, job) + + + def getLimit(self, thread): + limit = thread.active.plugin.account.getAccountData(thread.active.plugin.user)["options"].get("limitDL", ["0"])[0] + return int(limit) + + + def cleanup(self): + """do global cleanup, should be called when finished with pycurl""" + pycurl.global_cleanup() diff --git a/pyload/manager/__init__.py b/pyload/manager/__init__.py new file mode 100644 index 000000000..40a96afc6 --- /dev/null +++ b/pyload/manager/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- |