summaryrefslogtreecommitdiffstats
path: root/tests/helper
diff options
context:
space:
mode:
Diffstat (limited to 'tests/helper')
-rw-r--r--tests/helper/BenchmarkTest.py66
-rw-r--r--tests/helper/PluginTester.py151
-rw-r--r--tests/helper/Stubs.py141
-rw-r--r--tests/helper/__init__.py0
-rw-r--r--tests/helper/config.py9
5 files changed, 367 insertions, 0 deletions
diff --git a/tests/helper/BenchmarkTest.py b/tests/helper/BenchmarkTest.py
new file mode 100644
index 000000000..d28c52959
--- /dev/null
+++ b/tests/helper/BenchmarkTest.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+
+from time import time
+
+
+class BenchmarkTest:
+
+ bench = []
+ results = {}
+
+ @classmethod
+ def timestamp(cls, name, a):
+ t = time()
+ r = cls.results.get(name, [])
+ r.append((t-a) * 1000)
+ cls.results[name] = r
+
+ @classmethod
+ def benchmark(cls, n=1):
+
+ print "Benchmarking %s" % cls.__name__
+ print
+
+ for i in range(n):
+ cls.collect_results()
+
+ if "setUpClass" in cls.results:
+ cls.bench.insert(0, "setUpClass")
+
+ if "tearDownClass" in cls.results:
+ cls.bench.append("tearDownClass")
+
+ length = str(max([len(k) for k in cls.bench]) + 1)
+ total = 0
+
+ for k in cls.bench:
+ v = cls.results[k]
+
+ if len(v) > 1:
+ print ("%" + length +"s: %s | average: %.2f ms") % (k, ", ".join(["%.2f" % x for x in v]), sum(v)/len(v))
+ total += sum(v)/len(v)
+ else:
+ print ("%" + length +"s: %.2f ms") % (k, v[0])
+ total += v[0]
+
+ print "\ntotal: %.2f ms" % total
+
+
+ @classmethod
+ def collect_results(cls):
+ if hasattr(cls, "setUpClass"):
+ a = time()
+ cls.setUpClass()
+ cls.timestamp("setUpClass", a)
+
+ obj = cls()
+
+ for f in cls.bench:
+ a = time()
+ getattr(obj, "test_" + f)()
+ cls.timestamp(f, a)
+
+ if hasattr(cls, "tearDownClass"):
+ a = time()
+ cls.tearDownClass()
+ cls.timestamp("tearDownClass", a) \ No newline at end of file
diff --git a/tests/helper/PluginTester.py b/tests/helper/PluginTester.py
new file mode 100644
index 000000000..9312eb7bf
--- /dev/null
+++ b/tests/helper/PluginTester.py
@@ -0,0 +1,151 @@
+# -*- coding: utf-8 -*-
+
+from unittest import TestCase
+from os import makedirs, remove
+from os.path import exists, join, expanduser
+from shutil import move
+from sys import exc_clear, exc_info
+from logging import log, DEBUG
+from time import sleep, time
+from random import randint
+from glob import glob
+
+from pycurl import LOW_SPEED_TIME, FORM_FILE
+from json import loads
+
+from Stubs import Thread, Core, noop
+
+from pyload.network.RequestFactory import getRequest, getURL
+from pyload.plugins.Hoster import Hoster, Abort, Fail
+
+def _wait(self):
+ """ waits the time previously set """
+ self.waiting = True
+
+ waittime = self.pyfile.waitUntil - time()
+ log(DEBUG, "waiting %ss" % waittime)
+
+ if self.wantReconnect and waittime > 300:
+ raise Fail("Would wait for reconnect %ss" % waittime)
+ elif waittime > 300:
+ raise Fail("Would wait %ss" % waittime)
+
+ while self.pyfile.waitUntil > time():
+ sleep(1)
+ if self.pyfile.abort: raise Abort
+
+ self.waiting = False
+ self.pyfile.setStatus("starting")
+
+Hoster.wait = _wait
+
+
+def decryptCaptcha(self, url, get={}, post={}, cookies=False, forceUser=False, imgtype='jpg',
+ result_type='textual'):
+ img = self.load(url, get=get, post=post, cookies=cookies)
+
+ id = ("%.2f" % time())[-6:].replace(".", "")
+ temp_file = open(join("tmp", "tmpCaptcha_%s_%s.%s" % (self.__name__, id, imgtype)), "wb")
+ temp_file.write(img)
+ temp_file.close()
+
+ Ocr = self.core.pluginManager.loadClass("captcha", self.__name__)
+
+ if Ocr:
+ log(DEBUG, "Using tesseract for captcha")
+ sleep(randint(3000, 5000) / 1000.0)
+ if self.pyfile.abort: raise Abort
+
+ ocr = Ocr()
+ result = ocr.get_captcha(temp_file.name)
+ else:
+ log(DEBUG, "Using ct for captcha")
+ # put username and passkey into two lines in ct.conf
+ conf = join(expanduser("~"), "ct.conf")
+ if not exists(conf): raise Exception("CaptchaTrader config %s not found." % conf)
+ f = open(conf, "rb")
+ req = getRequest()
+
+ #raise timeout threshold
+ req.c.setopt(LOW_SPEED_TIME, 80)
+
+ try:
+ json = req.load("http://captchatrader.com/api/submit",
+ post={"api_key": "9f65e7f381c3af2b076ea680ae96b0b7",
+ "username": f.readline().strip(),
+ "password": f.readline().strip(),
+ "value": (FORM_FILE, temp_file.name),
+ "type": "file"}, multipart=True)
+ finally:
+ f.close()
+ req.close()
+
+ response = loads(json)
+ log(DEBUG, str(response))
+ result = response[1]
+
+ self.cTask = response[0]
+
+ return result
+
+Hoster.decryptCaptcha = decryptCaptcha
+
+
+def respond(ticket, value):
+ conf = join(expanduser("~"), "ct.conf")
+ f = open(conf, "rb")
+ try:
+ getURL("http://captchatrader.com/api/respond",
+ post={"is_correct": value,
+ "username": f.readline().strip(),
+ "password": f.readline().strip(),
+ "ticket": ticket})
+ except Exception, e :
+ print "CT Exception:", e
+ log(DEBUG, str(e))
+ finally:
+ f.close()
+
+
+
+def invalidCaptcha(self):
+ log(DEBUG, "Captcha invalid")
+ if self.cTask:
+ respond(self.cTask, 0)
+
+Hoster.invalidCaptcha = invalidCaptcha
+
+def correctCaptcha(self):
+ log(DEBUG, "Captcha correct")
+ if self.cTask:
+ respond(self.cTask, 1)
+
+Hoster.correctCaptcha = correctCaptcha
+
+Hoster.checkForSameFiles = noop
+
+class PluginTester(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.core = Core()
+ name = "%s.%s" % (cls.__module__, cls.__name__)
+ for f in glob(join(name, "debug_*")):
+ remove(f)
+
+ # Copy debug report to attachment dir for jenkins
+ @classmethod
+ def tearDownClass(cls):
+ name = "%s.%s" % (cls.__module__, cls.__name__)
+ if not exists(name): makedirs(name)
+ for f in glob("debug_*"):
+ move(f, join(name, f))
+
+ def setUp(self):
+ self.thread = Thread(self.core)
+ exc_clear()
+
+ def tearDown(self):
+ exc = exc_info()
+ if exc != (None, None, None):
+ debug = self.thread.writeDebugReport()
+ log(DEBUG, debug)
diff --git a/tests/helper/Stubs.py b/tests/helper/Stubs.py
new file mode 100644
index 000000000..2c356ba3e
--- /dev/null
+++ b/tests/helper/Stubs.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+
+import sys
+from os.path import abspath, dirname, join
+from time import strftime
+from traceback import format_exc
+from collections import defaultdict
+
+sys.path.append(abspath(join(dirname(__file__), "..", "..", "pyload", "lib")))
+sys.path.append(abspath(join(dirname(__file__), "..", "..")))
+
+import __builtin__
+
+from pyload.Api import Role
+from pyload.datatypes.User import User
+from pyload.datatypes.PyPackage import PyPackage
+from pyload.threads.BaseThread import BaseThread
+from pyload.config.ConfigParser import ConfigParser
+from pyload.network.RequestFactory import RequestFactory
+from pyload.PluginManager import PluginManager
+from pyload.utils.JsEngine import JsEngine
+
+from logging import log, DEBUG, INFO, WARN, ERROR
+
+
+# Do nothing
+def noop(*args, **kwargs):
+ pass
+
+ConfigParser.save = noop
+
+class LogStub:
+ def debug(self, *args):
+ log(DEBUG, *args)
+
+ def info(self, *args):
+ log(INFO, *args)
+
+ def error(self, *args):
+ log(ERROR, *args)
+
+ def warning(self, *args):
+ log(WARN, *args)
+
+
+class NoLog:
+ def debug(self, *args):
+ pass
+
+ def info(self, *args):
+ pass
+
+ def error(self, *args):
+ log(ERROR, *args)
+
+ def warning(self, *args):
+ log(WARN, *args)
+
+
+class Core:
+ def __init__(self):
+ self.log = NoLog()
+
+ self.api = self.core = self
+ self.threadManager = self
+ self.debug = True
+ self.captcha = True
+ self.config = ConfigParser()
+ self.pluginManager = PluginManager(self)
+ self.requestFactory = RequestFactory(self)
+ __builtin__.pyreq = self.requestFactory
+ self.accountManager = AccountManager()
+ self.addonManager = AddonManager()
+ self.eventManager = self.evm = NoopClass()
+ self.interActionManager = self.im = NoopClass()
+ self.js = JsEngine()
+ self.cache = {}
+ self.packageCache = {}
+
+ self.statusMsg = defaultdict(lambda: "statusmsg")
+
+ self.log = LogStub()
+
+ def getServerVersion(self):
+ return "TEST_RUNNER on %s" % strftime("%d %h %Y")
+
+ def path(self, path):
+ return path
+
+ def updateLink(self, *args):
+ pass
+
+ def updatePackage(self, *args):
+ pass
+
+ def processingIds(self, *args):
+ return []
+
+ def getPackage(self, id):
+ return PyPackage(self, 0, "tmp", "tmp", "", "", 0, 0)
+
+ def print_exc(self):
+ log(ERROR, format_exc())
+
+
+class NoopClass:
+ def __getattr__(self, item):
+ return noop
+
+
+class AddonManager(NoopClass):
+ def activePlugins(self):
+ return []
+
+
+class AccountManager:
+ def getAccountForPlugin(self, name):
+ return None
+
+
+class Thread(BaseThread):
+ def __init__(self, core):
+ BaseThread.__init__(self, core)
+ self.plugin = None
+
+
+ def writeDebugReport(self):
+ if hasattr(self, "pyfile"):
+ dump = BaseThread.writeDebugReport(self, self.plugin.__name__, pyfile=self.pyfile)
+ else:
+ dump = BaseThread.writeDebugReport(self, self.plugin.__name__, plugin=self.plugin)
+
+ return dump
+
+__builtin__._ = lambda x: x
+__builtin__.pypath = abspath(join(dirname(__file__), "..", ".."))
+__builtin__.addonManager = AddonManager()
+__builtin__.pyreq = None
+
+adminUser = User(None, uid=0, role=Role.Admin)
+normalUser = User(None, uid=1, role=Role.User) \ No newline at end of file
diff --git a/tests/helper/__init__.py b/tests/helper/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/helper/__init__.py
diff --git a/tests/helper/config.py b/tests/helper/config.py
new file mode 100644
index 000000000..81f7e6768
--- /dev/null
+++ b/tests/helper/config.py
@@ -0,0 +1,9 @@
+# -*- coding: utf-8 -*-
+
+# Test configuration
+credentials = ("TestUser", "pwhere")
+webPort = 8921
+wsPort = 7558
+
+webAddress = "http://localhost:%d/api" % webPort
+wsAddress = "ws://localhost:%d/api" % wsPort \ No newline at end of file