diff options
Diffstat (limited to 'tests')
40 files changed, 1973 insertions, 68 deletions
| diff --git a/tests/CrypterPluginTester.py b/tests/CrypterPluginTester.py new file mode 100644 index 000000000..42585939e --- /dev/null +++ b/tests/CrypterPluginTester.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from os.path import dirname, join +from nose.tools import nottest + +from logging import log, DEBUG + +from helper.Stubs import Core +from helper.PluginTester import PluginTester + +from pyload.plugins.Base import Fail +from pyload.utils import accumulate, to_int + +class CrypterPluginTester(PluginTester): +    @nottest +    def test_plugin(self, name, url, flag): + +        print "%s: %s" % (name, url.encode("utf8")) +        log(DEBUG, "%s: %s", name, url.encode("utf8")) + +        plugin = self.core.pluginManager.getPluginClass(name) +        p = plugin(self.core, None, "") +        self.thread.plugin = p + +        try: +            result = p._decrypt([url]) + +            if to_int(flag): +                assert to_int(flag) == len(result) + +        except Exception, e: +            if isinstance(e, Fail) and flag == "fail": +                pass +            else: +                raise + + +# setup methods + +c = Core() + +f = open(join(dirname(__file__), "crypterlinks.txt")) +links = [x.strip() for x in f.readlines()] +urls = [] +flags = {} + +for l in links: +    if not l or l.startswith("#"): continue +    if l.startswith("http"): +        if "||" in l: +            l, flag = l.split("||") +            flags[l] = flag + +        urls.append(l) + +h, crypter = c.pluginManager.parseUrls(urls) +plugins = accumulate(crypter) +for plugin, urls in plugins.iteritems(): + +    def meta_class(plugin): +        class _testerClass(CrypterPluginTester): +            pass +        _testerClass.__name__ = plugin +        return _testerClass + +    _testerClass = meta_class(plugin) + +    for i, url in enumerate(urls): +        def meta(plugin, url, flag, sig): +            def _test(self): +                self.test_plugin(plugin, url, flag) + +            _test.func_name = sig +            return _test + +        sig = "test_LINK%d" % i +        setattr(_testerClass, sig, meta(plugin, url, flags.get(url, None), sig)) +        print url + +    locals()[plugin] = _testerClass +    del _testerClass diff --git a/tests/HosterPluginTester.py b/tests/HosterPluginTester.py new file mode 100644 index 000000000..4f32ee9e7 --- /dev/null +++ b/tests/HosterPluginTester.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +from os.path import dirname +from logging import log, DEBUG +from hashlib import md5 +from time import time +from shutil import move + +from nose.tools import nottest + +from helper.Stubs import Core +from helper.parser import parse_config +from helper.PluginTester import PluginTester + +from pyload.datatypes.PyFile import PyFile, statusMap +from pyload.plugins.Base import Fail +from pyload.utils import accumulate +from pyload.utils.fs import save_join, join, exists, listdir, remove, stat + +DL_DIR = join("Downloads", "tmp") + + +class HosterPluginTester(PluginTester): +    files = {} + +    def setUp(self): +        PluginTester.setUp(self) +        for f in self.files: +            if exists(save_join(DL_DIR, f)): remove(save_join(DL_DIR, f)) + +        # folder for reports +        report = join("tmp", self.__class__.__name__) +        if exists(report): +            for f in listdir(report): +                remove(join(report, f)) + + +    @nottest +    def test_plugin(self, name, url, status): +        # Print to stdout to see whats going on +        print "%s: %s, %s" % (name, url.encode("utf8"), status) +        log(DEBUG, "%s: %s, %s", name, url.encode("utf8"), status) + +        # url and plugin should be only important thing +        pyfile = PyFile(self.core, -1, url, url, 0, 0, 0, 0, url, name, "", 0, 0, 0, 0) +        pyfile.initPlugin() + +        self.thread.pyfile = pyfile +        self.thread.plugin = pyfile.plugin + +        try: +            a = time() +            pyfile.plugin.preprocessing(self.thread) + +            log(DEBUG, "downloading took %ds" % (time() - a)) +            log(DEBUG, "size %d kb" % (pyfile.size / 1024)) + +            if status == "offline": +                raise Exception("No offline Exception raised.") + +            if pyfile.name not in self.files: +                raise Exception("Filename %s not recognized." % pyfile.name) + +            if not exists(save_join(DL_DIR, pyfile.name)): +                raise Exception("File %s does not exists." % pyfile.name) + +            hash = md5() +            f = open(save_join(DL_DIR, pyfile.name), "rb") +            while True: +                buf = f.read(4096) +                if not buf: break +                hash.update(buf) +            f.close() + +            if hash.hexdigest() != self.files[pyfile.name]: +                log(DEBUG, "Hash is %s" % hash.hexdigest()) + +                size = stat(f.name).st_size +                if size < 1024 * 1024 * 10: # 10MB +                    # Copy for debug report +                    log(DEBUG, "Downloaded file copied to report") +                    move(f.name, join("tmp", plugin, f.name)) + +                raise Exception("Hash does not match.") + +        except Exception, e: +            if isinstance(e, Fail) and status == "failed": +                pass +            elif isinstance(e, Fail) and status == "offline" and e.message == "offline": +                pass +            else: +                raise + +# setup methods +c = Core() + +sections = parse_config(join(dirname(__file__), "hosterlinks.txt")) + +for f in sections["files"]: +    name, hash = f.rsplit(" ", 1) +    HosterPluginTester.files[name] = str(hash) + +del sections["files"] + +urls = [] +status = {} + +for k, v in sections.iteritems(): +    if k not in statusMap: +        print "Unknown status %s" % k +    for url in v: +        urls.append(url) +        status[url] = k + +hoster, c = c.pluginManager.parseUrls(urls) + +plugins = accumulate(hoster) +for plugin, urls in plugins.iteritems(): +    # closure functions to retain local scope +    def meta_class(plugin): +        class _testerClass(HosterPluginTester): +            pass + +        _testerClass.__name__ = plugin +        return _testerClass + +    _testerClass = meta_class(plugin) + +    for i, url in enumerate(urls): +        def meta(__plugin, url, status, sig): +            def _test(self): +                self.test_plugin(__plugin, url, status) + +            _test.func_name = sig +            return _test + +        tmp_status = status.get(url) +        if tmp_status != "online": +            sig = "test_LINK%d_%s" % (i, tmp_status) +        else: +            sig = "test_LINK%d" % i + +        # set test method +        setattr(_testerClass, sig, meta(plugin, url, tmp_status, sig)) + +    #register class +    locals()[plugin] = _testerClass +    # remove from locals, or tested twice +    del _testerClass diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/__init__.py diff --git a/tests/api/ApiProxy.py b/tests/api/ApiProxy.py new file mode 100644 index 000000000..0da79a204 --- /dev/null +++ b/tests/api/ApiProxy.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- + + +from pyload.remote.apitypes_debug import classes, methods + +from tests.helper.config import credentials + +class ApiProxy: +    """ Proxy that does type checking on the api """ + +    def __init__(self, api, user=credentials[0], pw=credentials[1]): +        self.api = api +        self.user = user +        self.pw = pw + +        if user and pw is not None: +            self.api.login(user, pw) + +    def assert_type(self, result, type): +        if not type: return # void +        try: +            # Complex attribute +            if isinstance(type, tuple): +                # Optional result +                if type[0] is None: +                    # Only check if not None +                    if result is not None: self.assert_type(result, type[1]) + +                # List +                elif type[0] == list: +                    assert isinstance(result, list) +                    for item in result: +                        self.assert_type(item, type[1]) +                # Dict +                elif type[0] == dict: +                    assert isinstance(result, dict) +                    for k, v in result.iteritems(): +                        self.assert_type(k, type[1]) +                        self.assert_type(v, type[2]) + +            # Struct - Api class +            elif hasattr(result, "__name__") and result.__name__ in classes: +                for attr, atype in zip(result.__slots__, classes[result.__name__]): +                    self.assert_type(getattr(result, attr), atype) +            else: # simple object +                assert isinstance(result, type) +        except AssertionError: +            print "Assertion for %s as %s failed" % (result, type) +            raise + + +    def call(self, func, *args, **kwargs): +        result = getattr(self.api, func)(*args, **kwargs) +        self.assert_type(result, methods[func]) + +        return result + + +    def __getattr__(self, item): +        def call(*args, **kwargs): +            return self.call(item, *args, **kwargs) + +        return call + +if __name__ == "__main__": + +    from pyload.remote.JSONClient import JSONClient + +    api = ApiProxy(JSONClient(), "User", "test") +    api.getServerVersion()
\ No newline at end of file diff --git a/tests/api/ApiTester.py b/tests/api/ApiTester.py new file mode 100644 index 000000000..e9a185947 --- /dev/null +++ b/tests/api/ApiTester.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +from pyload.remote.JSONClient import JSONClient +from pyload.remote.WSClient import WSClient + +from tests.helper.config import webAddress, wsAddress + +from ApiProxy import ApiProxy + +class ApiTester: + +    tester= [] + +    @classmethod +    def register(cls, tester): +        cls.tester.append(tester) + +    @classmethod +    def get_methods(cls): +        """ All available methods for testing """ +        methods = [] +        for t in cls.tester: +            methods.extend(getattr(t, attr) for attr in dir(t) if attr.startswith("test_")) +        return methods + +    def __init__(self): +        ApiTester.register(self) +        self.api = None + +    def setApi(self, api): +        self.api = api + +    def enableJSON(self): +        self.api = ApiProxy(JSONClient(webAddress)) + +    def enableWS(self): +        self.api = ApiProxy(WSClient(wsAddress)) diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/api/__init__.py diff --git a/tests/api/test_JSONBackend.py b/tests/api/test_JSONBackend.py new file mode 100644 index 000000000..d13d6709f --- /dev/null +++ b/tests/api/test_JSONBackend.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- + +from nose.tools import raises, assert_equal + +from requests.auth import HTTPBasicAuth +import requests + +import json + +from pyload.remote.apitypes import Forbidden +from pyload.remote.JSONClient import JSONClient + +from tests.helper.config import credentials, webAddress + +class TestJSONBackend: + +    def setUp(self): +        self.client = JSONClient(webAddress) + +    def test_login(self): +        self.client.login(*credentials) +        self.client.getServerVersion() +        self.client.logout() + +    def test_wronglogin(self): +        ret = self.client.login("WrongUser", "wrongpw") +        assert ret is False + +    def test_httpauth(self): +        # cheap http auth +        ret = requests.get(webAddress + "/getServerVersion", auth=HTTPBasicAuth(*credentials)) +        assert_equal(ret.status_code, 200) +        assert ret.text + +    def test_jsonbody(self): +        payload = {'section': 'webinterface', 'option': 'port'} +        headers = {'content-type': 'application/json'} + +        ret = requests.get(webAddress + "/getConfigValue", headers=headers, +                           auth=HTTPBasicAuth(*credentials), data=json.dumps(payload)) + +        assert_equal(ret.status_code, 200) +        assert ret.text + +    @raises(Forbidden) +    def test_access(self): +        self.client.getServerVersion() + +    @raises(AttributeError) +    def test_unknown_method(self): +        self.client.login(*credentials) +        self.client.sdfdsg()
\ No newline at end of file diff --git a/tests/api/test_WebSocketBackend.py b/tests/api/test_WebSocketBackend.py new file mode 100644 index 000000000..a9288104f --- /dev/null +++ b/tests/api/test_WebSocketBackend.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +from nose.tools import raises + +from pyload.remote.apitypes import Forbidden +from pyload.remote.WSClient import WSClient + +from tests.helper.config import credentials, wsAddress + +class TestWebSocketBackend: + +    def setUp(self): +        self.client = WSClient(wsAddress) +        self.client.connect() + +    def tearDown(self): +        self.client.close() + +    def test_login(self): +        self.client.login(*credentials) +        self.client.getServerVersion() +        self.client.logout() + +    def test_wronglogin(self): +        ret = self.client.login("WrongUser", "wrongpw") +        assert ret is False + +    @raises(Forbidden) +    def test_access(self): +        self.client.getServerVersion() + +    @raises(AttributeError) +    def test_unknown_method(self): +        self.client.login(*credentials) +        self.client.sdfdsg() diff --git a/tests/api/test_api.py b/tests/api/test_api.py new file mode 100644 index 000000000..668470fe4 --- /dev/null +++ b/tests/api/test_api.py @@ -0,0 +1,50 @@ +from unittest import TestCase +from random import choice + +from pyload.Core import Core + +from ApiTester import ApiTester + + +class TestAPI(TestCase): +    """ +    Test all available testers randomly and on all backends +    """ +    _multiprocess_can_split_ = True +    core = None + +    #TODO: parallel testing +    @classmethod +    def setUpClass(cls): +        from test_noargs import TestNoArgs + +        cls.core = Core() +        cls.core.start(False, False, True) +        for Test in (TestNoArgs,): +            t = Test() +            t.enableJSON() +            t = Test() +            t.enableWS() +            t = Test() +            t.setApi(cls.core.api) + +        cls.methods = ApiTester.get_methods() + +    @classmethod +    def tearDownClass(cls): +        cls.core.shutdown() + +    def test_random(self, n=10000): +        for i in range(n): +            func = choice(self.methods) +            func() + +    def test_random2(self, n): +        self.test_random(n) + +    def test_random3(self, n): +        self.test_random(n) + +    def test_random4(self, n): +        self.test_random(n) + diff --git a/tests/api/test_noargs.py b/tests/api/test_noargs.py new file mode 100644 index 000000000..e84946e45 --- /dev/null +++ b/tests/api/test_noargs.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +import inspect + +from ApiTester import ApiTester + +from pyload.remote.apitypes import Iface + +IGNORE = ('quit', 'restart') + +class TestNoArgs(ApiTester): +    def setUp(self): +        self.enableJSON() + +# Setup test_methods dynamically, only these which require no arguments +for name in dir(Iface): +    if name.startswith("_") or name in IGNORE: continue + +    spec = inspect.getargspec(getattr(Iface, name)) +    if len(spec.args) == 1 and (not spec.varargs or len(spec.varargs) == 0): +        def meta_test(name): #retain local scope +            def test(self): +                getattr(self.api, name)() +            test.func_name = "test_%s" % name +            return test + +        setattr(TestNoArgs, "test_%s" % name, meta_test(name)) + +        del meta_test
\ No newline at end of file diff --git a/tests/clonedigger.sh b/tests/clonedigger.sh new file mode 100755 index 000000000..93c1cb323 --- /dev/null +++ b/tests/clonedigger.sh @@ -0,0 +1,2 @@ +#!/bin/bash +clonedigger -o cpd.xml --cpd-output --fast --ignore-dir=lib --ignore-dir=remote module diff --git a/tests/code_analysis.sh b/tests/code_analysis.sh new file mode 100755 index 000000000..e027bac2f --- /dev/null +++ b/tests/code_analysis.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +sloccount --duplicates --wide --details pyload > sloccount.sc + +echo "Running pep8" +pep8 pyload > pep8.txt + +echo "Running pylint" +pylint --reports=no pyload > pylint.txt || exit 0 + +#echo "Running pyflakes" +# pyflakes to pylint syntak +#find -name '*.py' |egrep -v '^.(/tests/|/pyload/lib)'|xargs pyflakes  > pyflakes.log || : +# Filter warnings and strip ./ from path +#cat pyflakes.log | awk -F\: '{printf "%s:%s: [E]%s\n", $1, $2, $3}' | grep -i -E -v "'_'|pypath|webinterface|pyreq|addonmanager" > pyflakes.txt +#sed -i 's/^.\///g' pyflakes.txt
\ No newline at end of file diff --git a/tests/config/pyload.conf.org b/tests/config/pyload.conf.org new file mode 100644 index 000000000..0a422f258 --- /dev/null +++ b/tests/config/pyload.conf.org @@ -0,0 +1,75 @@ +version: 2 + +[remote] +nolocalauth = False +activated = True +port = 7558 +listenaddr = 127.0.0.1 + +[log] +log_size = 100 +log_folder = Logs +file_log = False +log_count = 5 +log_rotate = True + +[permission] +group = users +change_dl = False +change_file = False +user = user +file = 0644 +change_group = False +folder = 0755 +change_user = False + +[general] +language = en +download_folder = Downloads +checksum = False +folder_per_package = True +debug_mode = True +min_free_space = 200 +renice = 0 + +[ssl] +cert = ssl.crt +activated = False +key = ssl.key + +[webinterface] +template = default +activated = True +prefix =  +server = builtin +host = 127.0.0.1 +https = False +port = 8921 + +[proxy] +username =  +proxy = False +address = localhost +password =  +type = http +port = 7070 + +[reconnect] +endTime = 0:00 +activated = False +method = ./reconnect.sh +startTime = 0:00 + +[download] +max_downloads = 3 +limit_speed = False +interface =  +skip_existing = False +max_speed = -1 +ipv6 = False +chunks = 3 + +[downloadTime] +start = 0:00 +end = 0:00 + diff --git a/tests/config/pyload.db.org b/tests/config/pyload.db.orgBinary files differ new file mode 100644 index 000000000..26af474c1 --- /dev/null +++ b/tests/config/pyload.db.org diff --git a/tests/crypterlinks.txt b/tests/crypterlinks.txt new file mode 100644 index 000000000..2873915c0 --- /dev/null +++ b/tests/crypterlinks.txt @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- + +# Crypter links, append ||fail or ||#N to mark error or number of expected results (single links+packages) + +http://www.ddlstorage.com/folder/EmkJPLTYJp||4 diff --git a/tests/helper/BenchmarkTest.py b/tests/helper/BenchmarkTest.py new file mode 100644 index 000000000..d28c52959 --- /dev/null +++ b/tests/helper/BenchmarkTest.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- + +from time import time + + +class BenchmarkTest: + +    bench = [] +    results = {} + +    @classmethod +    def timestamp(cls, name, a): +        t = time() +        r = cls.results.get(name, []) +        r.append((t-a) * 1000) +        cls.results[name] = r + +    @classmethod +    def benchmark(cls, n=1): + +        print "Benchmarking %s" % cls.__name__ +        print + +        for i in range(n): +            cls.collect_results() + +        if "setUpClass" in cls.results: +            cls.bench.insert(0, "setUpClass") + +        if "tearDownClass" in cls.results: +            cls.bench.append("tearDownClass") + +        length = str(max([len(k) for k in cls.bench]) + 1) +        total = 0 + +        for k in cls.bench: +            v = cls.results[k] + +            if len(v) > 1: +                print ("%" + length +"s: %s | average: %.2f ms") % (k, ", ".join(["%.2f" % x for x in v]), sum(v)/len(v)) +                total += sum(v)/len(v) +            else: +                print ("%" + length +"s: %.2f ms") % (k, v[0]) +                total += v[0] + +        print "\ntotal: %.2f ms" % total + + +    @classmethod +    def collect_results(cls): +        if hasattr(cls, "setUpClass"): +            a = time() +            cls.setUpClass() +            cls.timestamp("setUpClass", a) + +        obj = cls() + +        for f in cls.bench: +            a = time() +            getattr(obj, "test_" + f)() +            cls.timestamp(f, a) + +        if hasattr(cls, "tearDownClass"): +            a = time() +            cls.tearDownClass() +            cls.timestamp("tearDownClass", a)
\ No newline at end of file diff --git a/tests/helper/PluginTester.py b/tests/helper/PluginTester.py new file mode 100644 index 000000000..b3c93311c --- /dev/null +++ b/tests/helper/PluginTester.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase +from os import makedirs, remove +from os.path import exists, join, expanduser +from shutil import move +from sys import exc_clear, exc_info +from logging import log, DEBUG +from time import sleep, time +from glob import glob + +from pycurl import LOW_SPEED_TIME, FORM_FILE +from json import loads + +from Stubs import Thread, Core, noop + +from pyload.network.RequestFactory import getRequest +from pyload.plugins.Base import Abort, Fail +from pyload.plugins.Hoster import Hoster + +def _wait(self): +    """ waits the time previously set """ +    self.waiting = True + +    waittime = self.pyfile.waitUntil - time() +    log(DEBUG, "waiting %ss" % waittime) + +    if self.wantReconnect and waittime > 300: +        raise Fail("Would wait for reconnect %ss" % waittime) +    elif waittime > 300: +        raise Fail("Would wait %ss" % waittime) + +    while self.pyfile.waitUntil > time(): +        sleep(1) +        if self.pyfile.abort: raise Abort + +    self.waiting = False +    self.pyfile.setStatus("starting") + +Hoster.wait = _wait + + +def decryptCaptcha(self, url, get={}, post={}, cookies=False, forceUser=False, imgtype='jpg', +                   result_type='textual'): +    img = self.load(url, get=get, post=post, cookies=cookies) + +    id = ("%.2f" % time())[-6:].replace(".", "") +    temp_file = open(join("tmp", "tmpCaptcha_%s_%s.%s" % (self.__name__, id, imgtype)), "wb") +    temp_file.write(img) +    temp_file.close() + +    log(DEBUG, "Using ct for captcha") +    # put username and passkey into two lines in ct.conf +    conf = join(expanduser("~"), "ct.conf") +    if not exists(conf): raise Exception("CaptchaService config %s not found." % conf) +    f = open(conf, "rb") +    req = getRequest() + +    #raise timeout threshold +    req.c.setopt(LOW_SPEED_TIME, 300) + +    try: +        json = req.load("http://captchatrader.com/api/submit", +            post={"api_key": "9f65e7f381c3af2b076ea680ae96b0b7", +                  "username": f.readline().strip(), +                  "password": f.readline().strip(), +                  "value": (FORM_FILE, temp_file.name), +                  "type": "file"}, multipart=True) +    finally: +        f.close() +        req.close() + +    response = loads(json) +    log(DEBUG, str(response)) +    result = response[1] + +    self.cTask = response[0] + +    return result + +Hoster.decryptCaptcha = decryptCaptcha + +def invalidCaptcha(self): +    log(DEBUG, "Captcha invalid") + +Hoster.invalidCaptcha = invalidCaptcha + +def correctCaptcha(self): +    log(DEBUG, "Captcha correct") + +Hoster.correctCaptcha = correctCaptcha + +Hoster.checkForSameFiles = noop + +class PluginTester(TestCase): +    @classmethod +    def setUpClass(cls): +        cls.core = Core() +        name = "%s.%s" % (cls.__module__, cls.__name__) +        for f in glob(join(name, "debug_*")): +            remove(f) + +    # Copy debug report to attachment dir for jenkins +    @classmethod +    def tearDownClass(cls): +        name = "%s.%s" % (cls.__module__, cls.__name__) +        if not exists(name): makedirs(name) +        for f in glob("debug_*"): +            move(f, join(name, f)) + +    def setUp(self): +        self.thread = Thread(self.core) +        exc_clear() + +    def tearDown(self): +        exc = exc_info() +        if exc != (None, None, None): +            debug = self.thread.writeDebugReport() +            log(DEBUG, debug) diff --git a/tests/helper/Stubs.py b/tests/helper/Stubs.py new file mode 100644 index 000000000..81b7d8a09 --- /dev/null +++ b/tests/helper/Stubs.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- + +import sys +from os.path import abspath, dirname, join +from time import strftime +from traceback import format_exc +from collections import defaultdict + +sys.path.append(abspath(join(dirname(__file__), "..", "..", "pyload", "lib"))) +sys.path.append(abspath(join(dirname(__file__), "..", ".."))) + +import __builtin__ + +from pyload.Api import Role +from pyload.datatypes.User import User +from pyload.datatypes.PyPackage import PyPackage +from pyload.threads.BaseThread import BaseThread +from pyload.config.ConfigParser import ConfigParser +from pyload.network.RequestFactory import RequestFactory +from pyload.PluginManager import PluginManager +from pyload.utils.JsEngine import JsEngine + +from logging import log, DEBUG, INFO, WARN, ERROR + + +# Do nothing +def noop(*args, **kwargs): +    pass + + +ConfigParser.save = noop + + +class LogStub: +    def debug(self, *args): +        log(DEBUG, *args) + +    def info(self, *args): +        log(INFO, *args) + +    def error(self, *args): +        log(ERROR, *args) + +    def warning(self, *args): +        log(WARN, *args) + + +class NoLog: +    def debug(self, *args): +        pass + +    def info(self, *args): +        pass + +    def error(self, *args): +        log(ERROR, *args) + +    def warning(self, *args): +        log(WARN, *args) + + +class Core: +    def __init__(self): +        self.log = NoLog() + +        self.api = self.core = self +        self.threadManager = self +        self.debug = True +        self.captcha = True +        self.config = ConfigParser() +        self.pluginManager = PluginManager(self) +        self.requestFactory = RequestFactory(self) +        __builtin__.pyreq = self.requestFactory +        self.accountManager = AccountManager() +        self.addonManager = AddonManager() +        self.eventManager = self.evm = NoopClass() +        self.interactionManager = self.im = NoopClass() +        self.scheduler = NoopClass() +        self.js = JsEngine() +        self.cache = {} +        self.packageCache = {} + +        self.statusMsg = defaultdict(lambda: "statusmsg") + +        self.log = LogStub() + +    def getServerVersion(self): +        return "TEST_RUNNER on %s" % strftime("%d %h %Y") + +    def path(self, path): +        return path + +    def updateFile(self, *args): +        pass + +    def updatePackage(self, *args): +        pass + +    def processingIds(self, *args): +        return [] + +    def getPackage(self, *args): +        return PyPackage(self, 0, "tmp", "tmp", -1, 0, "", "", "", 0, "", 0, 0, 0) + +    def print_exc(self): +        log(ERROR, format_exc()) + + +class NoopClass: +    def __getattr__(self, item): +        return noop + + +class AddonManager(NoopClass): +    def activePlugins(self): +        return [] + + +class AccountManager: +    def getAccountForPlugin(self, name): +        return None + + +class Thread(BaseThread): +    def __init__(self, core): +        BaseThread.__init__(self, core) +        self.plugin = None + + +    def writeDebugReport(self): +        if hasattr(self, "pyfile"): +            dump = BaseThread.writeDebugReport(self, self.plugin.__name__, pyfile=self.pyfile) +        else: +            dump = BaseThread.writeDebugReport(self, self.plugin.__name__, plugin=self.plugin) + +        return dump + + +__builtin__._ = lambda x: x +__builtin__.pypath = abspath(join(dirname(__file__), "..", "..")) +__builtin__.addonManager = AddonManager() +__builtin__.pyreq = None + +adminUser = User(None, uid=0, role=Role.Admin) +normalUser = User(None, uid=1, role=Role.User)
\ No newline at end of file diff --git a/tests/helper/__init__.py b/tests/helper/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/helper/__init__.py diff --git a/tests/helper/config.py b/tests/helper/config.py new file mode 100644 index 000000000..81f7e6768 --- /dev/null +++ b/tests/helper/config.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +# Test configuration +credentials = ("TestUser", "pwhere") +webPort = 8921 +wsPort = 7558 + +webAddress = "http://localhost:%d/api" % webPort +wsAddress = "ws://localhost:%d/api" % wsPort
\ No newline at end of file diff --git a/tests/helper/parser.py b/tests/helper/parser.py new file mode 100644 index 000000000..5031ca7c3 --- /dev/null +++ b/tests/helper/parser.py @@ -0,0 +1,22 @@ + +import codecs + +def parse_config(path): +    f = codecs.open(path, "rb", "utf_8") +    result = {} + +    current_section = None +    for line in f.readlines(): +        line = line.strip() +        if not line or line.startswith("#"): +            continue + +        if line.startswith("["): +            current_section = line.replace("[", "").replace("]", "") +            result[current_section] = [] +        else: +            if not current_section: +                raise Exception("Line without section: %s" % line) +            result[current_section].append(line) + +    return result diff --git a/tests/hosterlinks.txt b/tests/hosterlinks.txt new file mode 100755 index 000000000..f717fee28 --- /dev/null +++ b/tests/hosterlinks.txt @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*-                                                                                                                                 + +# Valid files, with md5 hash +[files] + +# Two variants of filename with special chars +räándöóm.bin 5dde9e312311d964572f5a33a0992a2c +random (机会,ឱកស,ランダム,隨機,лчаный,cơhội,შანსი,కంħن↓∂ƒ_স€קสม®¢äöář_æžĐşiồй_ныứ&+).bin 5dde9e312311d964572f5a33a0992a2c +# One host changed the name to this, with one little difference +random (机会,ឱកស,ランダム,隨機,лчаный,cơhội,შანსი,కంħن↓∂ƒ_স€קสม®¢äöář_æžđşiồй_ныứ&+).bin 5dde9e312311d964572f5a33a0992a2c + +random100.bin f346c3ea47d8bfce3a12033129dec8ff + +[online] + +http://download.pyload.org/random%20(%e6%9c%ba%e4%bc%9a%2c%e1%9e%b1%e1%9e%80%e1%9e%9f%2c%e3%83%a9%e3%83%b3%e3%83%80%e3%83%a0%2c%e9%9a%a8%e6%a9%9f%2c%d0%bb%d1%87%d0%b0%d0%bd%d1%8b%d0%b9%2cc%c6%a1h%e1%bb%99i%2c%e1%83%a8%e1%83%90%e1%83%9c%e1%83%a1%e1%83%98%2c%e0%b0%95%e0%b0%82%c4%a7%d9%86%e2%86%93%e2%88%82%c6%92_%e0%a6%b8%e2%82%ac%d7%a7%e0%b8%aa%e0%b8%a1%c2%ae%c2%a2%c3%a4%c3%b6%c3%a1%c5%99_%c3%a6%c5%be%c4%90%c5%9fi%e1%bb%93%d0%b9_%d0%bd%d1%8b%e1%bb%a9%26%2b).bin + +http://www.putlocker.com/file/06DB5A3B9CBAD597 +http://www59.zippyshare.com/v/56973477/file.html +http://www.share-online.biz/dl/IAOREQPMWG +http://hbcyum.1fichier.com/ +http://bitshare.com/?f=5a1m0nia +http://uploaded.net/file/n4fhw1ol + +[offline]
\ No newline at end of file diff --git a/tests/manager/__init__.py b/tests/manager/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/manager/__init__.py diff --git a/tests/manager/test_accountManager.py b/tests/manager/test_accountManager.py new file mode 100644 index 000000000..1b328f892 --- /dev/null +++ b/tests/manager/test_accountManager.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase + +from tests.helper.Stubs import Core, adminUser, normalUser + +from pyload.database import DatabaseBackend +from pyload.AccountManager import AccountManager + + +class TestAccountManager(TestCase): +    @classmethod +    def setUpClass(cls): +        cls.core = Core() +        cls.db = DatabaseBackend(cls.core) +        cls.core.db = cls.db +        cls.db.setup() + +    @classmethod +    def tearDownClass(cls): +        cls.db.shutdown() + +    def setUp(self): +        self.db.purgeAccounts() +        self.manager = AccountManager(self.core) + +    def test_access(self): +        account = self.manager.updateAccount("Http", "User", "somepw", adminUser) + +        assert account is self.manager.updateAccount("Http", "User", "newpw", adminUser) +        self.assertEqual(account.password, "newpw") + +        assert self.manager.getAccount("Http", "User") is account +        assert self.manager.getAccount("Http", "User", normalUser) is None + +    def test_config(self): +        account = self.manager.updateAccount("Http", "User", "somepw", adminUser) +        info = account.toInfoData() + +        self.assertEqual(info.config[0].name, "domain") +        self.assertEqual(info.config[0].value, "") +        self.assertEqual(account.getConfig("domain"), "") + +        account.setConfig("domain", "df") + +        info = account.toInfoData() +        self.assertEqual(info.config[0].value, "df") + +        info.config[0].value = "new" + +        account.updateConfig(info.config) +        self.assertEqual(account.getConfig("domain"), "new") + + +    def test_shared(self): +        account = self.manager.updateAccount("Http", "User", "somepw", adminUser) + +        assert self.manager.selectAccount("Http", adminUser) is account +        assert account.loginname == "User" + +        assert self.manager.selectAccount("Something", adminUser) is None +        assert self.manager.selectAccount("Http", normalUser) is None + +        account.shared = True + +        assert self.manager.selectAccount("Http", normalUser) is account +        assert self.manager.selectAccount("sdf", normalUser) is None + + + diff --git a/tests/manager/test_configManager.py b/tests/manager/test_configManager.py new file mode 100644 index 000000000..ca572bf20 --- /dev/null +++ b/tests/manager/test_configManager.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase +from collections import defaultdict + +from nose.tools import raises + +from tests.helper.Stubs import Core, adminUser, normalUser + +from pyload.Api import InvalidConfigSection +from pyload.database import DatabaseBackend +from pyload.config.ConfigParser import ConfigParser +from pyload.config.ConfigManager import ConfigManager +from pyload.utils import primary_uid + +adminUser = primary_uid(adminUser) +normalUser = primary_uid(normalUser) + +class TestConfigManager(TestCase): + +    @classmethod +    def setUpClass(cls): +        cls.core = Core() +        cls.db = DatabaseBackend(cls.core) +        cls.core.db = cls.db +        cls.db.manager = cls.core +        cls.db.manager.statusMsg = defaultdict(lambda: "statusmsg") +        cls.parser = ConfigParser() +        cls.config = ConfigManager(cls.core, cls.parser) +        cls.db.setup() + +    def setUp(self): +        self.db.clearAllConfigs() +        # used by some tests, needs to be deleted +        self.config.delete("plugin", adminUser) + + +    def addConfig(self): +        self.config.addConfigSection("plugin", "Name", "desc", "something", +            [("value", "str", "label", "default")]) + +    def test_db(self): + +        self.db.saveConfig("plugin", "some value", 0) +        self.db.saveConfig("plugin", "some other value", 1) + +        assert self.db.loadConfig("plugin", 0) == "some value" +        assert self.db.loadConfig("plugin", 1) == "some other value" + +        d = self.db.loadAllConfigs() +        assert d[0]["plugin"] == "some value" +        assert self.db.loadConfigsForUser(0)["plugin"] == "some value" + +        self.db.deleteConfig("plugin", 0) + +        assert 0 not in self.db.loadAllConfigs() +        assert "plugin" not in self.db.loadConfigsForUser(0) + +        self.db.deleteConfig("plugin") + +        assert not self.db.loadAllConfigs() +        assert self.db.loadConfig("plugin") == "" + +    def test_parser(self): +        assert self.config.get("general", "language") +        self.config["general"]["language"] = "de" +        assert self.config["general"]["language"] == "de" +        assert self.config.get("general", "language", adminUser) == "de" + +    def test_user(self): +        self.addConfig() + +        assert self.config["plugin"]["value"] == "default" +        assert self.config.get("plugin", "value", adminUser) == "default" +        assert self.config.get("plugin", "value", normalUser) == "default" + +        assert self.config.set("plugin", "value", False, user=normalUser) +        assert self.config.get("plugin", "value", normalUser) is False +        assert self.config["plugin"]["value"] == "default" + +        assert self.config.set("plugin", "value", True, user=adminUser) +        assert self.config.get("plugin", "value", adminUser) is True +        assert self.config["plugin"]["value"] is True +        assert self.config.get("plugin", "value", normalUser) is False + +        self.config.delete("plugin", normalUser) +        assert self.config.get("plugin", "value", normalUser) == "default" + +        self.config.delete("plugin") +        assert self.config.get("plugin", "value", adminUser) == "default" +        assert self.config["plugin"]["value"] == "default" + +        # should not trigger something +        self.config.delete("foo") + +    def test_sections(self): +        self.addConfig() + +        i = 0 +        # there should be only one section, with no values +        for name, config, values in self.config.iterSections(adminUser): +            assert name == "plugin" +            assert values == {} +            i +=1 +        assert i == 1 + +        assert self.config.set("plugin", "value", True, user=adminUser) + +        i = 0 +        # now we assert the correct values +        for name, config, values in self.config.iterSections(adminUser): +            assert name == "plugin" +            assert values == {"value":True} +            i +=1 +        assert i == 1 + +    def test_get_section(self): +        self.addConfig() +        self.assertEqual(self.config.getSection("plugin")[0].label, "Name") + +    # TODO: more save tests are needed +    def test_saveValues(self): +        self.addConfig() +        self.config.saveValues(adminUser, "plugin") + +    @raises(InvalidConfigSection) +    def test_restricted_access(self): +        self.config.get("general", "language", normalUser) + +    @raises(InvalidConfigSection) +    def test_error(self): +        self.config.get("foo", "bar") + +    @raises(InvalidConfigSection) +    def test_error_set(self): +        self.config.set("foo", "bar", True)
\ No newline at end of file diff --git a/tests/manager/test_filemanager.py b/tests/manager/test_filemanager.py new file mode 100644 index 000000000..a7507cada --- /dev/null +++ b/tests/manager/test_filemanager.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- + +from random import choice + +from tests.helper.Stubs import Core +from tests.helper.BenchmarkTest import BenchmarkTest + +from pyload.database import DatabaseBackend +# disable asyncronous queries +DatabaseBackend.async = DatabaseBackend.queue + +from pyload.Api import DownloadState +from pyload.FileManager import FileManager + + +class TestFileManager(BenchmarkTest): +    bench = ["add_packages", "add_files", "get_files_root", "get", +             "get_package_content", "get_package_tree", +             "order_package", "order_files", "move"] + +    pids = [-1] +    count = 100 + +    @classmethod +    def setUpClass(cls): +        c = Core() +        # db needs seperate initialisation +        cls.db = c.db = DatabaseBackend(c) +        cls.db.setup() +        cls.db.purgeAll() + +        cls.m = cls.db.manager = FileManager(c) + +    @classmethod +    def tearDownClass(cls): +        cls.db.purgeAll() +        cls.db.shutdown() + + +    # benchmarker ignore setup +    def setUp(self): +        self.db.purgeAll() +        self.pids = [-1] + +        self.count = 20 +        self.test_add_packages() +        self.test_add_files() + +    def test_add_packages(self): +        for i in range(100): +            pid = self.m.addPackage("name", "folder", choice(self.pids), "", "", "", False) +            self.pids.append(pid) + +        if -1 in self.pids: +            self.pids.remove(-1) + +    def test_add_files(self): +        for pid in self.pids: +            self.m.addLinks([("plugin %d" % i, "url %s" % i) for i in range(self.count)], pid) + +        count = self.m.getQueueStats()[0] +        files = self.count * len(self.pids) +        # in test runner files get added twice +        assert count == files or count == files * 2 + +    def test_get(self): +        info = self.m.getPackageInfo(choice(self.pids)) +        assert info.stats.linkstotal == self.count + +        fid = choice(info.fids) +        f = self.m.getFile(fid) +        assert f.fid in self.m.files + +        f.name = "new name" +        f.sync() +        finfo = self.m.getFileInfo(fid) +        assert finfo is not None +        assert finfo.name == "new name" + +        p = self.m.getPackage(choice(self.pids)) +        assert p is not None +        assert p.pid in self.m.packages +        p.sync() + +        p.delete() + +        self.m.getTree(-1, True, None) + +    def test_get_filtered(self): +        all = self.m.getTree(-1, True, None) +        finished = self.m.getTree(-1, True, DownloadState.Finished) +        unfinished = self.m.getTree(-1, True, DownloadState.Unfinished) + +        assert len(finished.files) + len(unfinished.files) == len(all.files) == self.m.db.filecount() + + +    def test_get_files_root(self): +        view = self.m.getTree(-1, True, None) + +        for pid in self.pids: +            assert pid in view.packages + +        assert len(view.packages) == len(self.pids) + +        p = choice(view.packages.values()) +        assert len(p.fids) == self.count +        assert p.stats.linkstotal == self.count + + +    def test_get_package_content(self): +        view = self.m.getTree(choice(self.pids), False, None) +        p = view.root + +        assert len(view.packages) == len(p.pids) +        for pid in p.pids: assert pid in view.packages + +    def test_get_package_tree(self): +        view = self.m.getTree(choice(self.pids), True, None) +        for pid in view.root.pids: assert pid in view.packages +        for fid in view.root.fids: assert fid in view.files + +    def test_delete(self): +        self.m.deleteFile(self.count * 5) +        self.m.deletePackage(choice(self.pids)) + +    def test_order_package(self): +        parent = self.m.addPackage("order", "", -1, "", "", "", False) +        self.m.addLinks([("url", "plugin") for i in range(100)], parent) + +        pids = [self.m.addPackage("c", "", parent, "", "", "", False) for i in range(5)] +        v = self.m.getTree(parent, False, None) +        self.assert_ordered(pids, 0, 5, v.root.pids, v.packages, True) + +        pid = v.packages.keys()[0] +        self.assert_pack_ordered(parent, pid, 3) +        self.assert_pack_ordered(parent, pid, 0) +        self.assert_pack_ordered(parent, pid, 0) +        self.assert_pack_ordered(parent, pid, 4) +        pid = v.packages.keys()[2] +        self.assert_pack_ordered(parent, pid, 4) +        self.assert_pack_ordered(parent, pid, 3) +        self.assert_pack_ordered(parent, pid, 2) + + +    def test_order_files(self): +        parent = self.m.addPackage("order", "", -1, "", "", "", False) +        self.m.addLinks([("url", "plugin") for i in range(100)], parent) +        v = self.m.getTree(parent, False, None) + +        fids = v.root.fids[10:20] +        v = self.assert_files_ordered(parent, fids, 0) + +        fids = v.root.fids[20:30] + +        self.m.orderFiles(fids, parent, 99) +        v = self.m.getTree(parent, False, None) +        assert fids[-1] == v.root.fids[-1] +        assert fids[0] == v.root.fids[90] +        self.assert_ordered(fids, 90, 100, v.root.fids, v.files) + +        fids = v.root.fids[80:] +        v = self.assert_files_ordered(parent, fids, 20) + +        self.m.orderFiles(fids, parent, 80) +        v = self.m.getTree(parent, False, None) +        self.assert_ordered(fids, 61, 81, v.root.fids, v.files) + +        fids = v.root.fids[50:51] +        self.m.orderFiles(fids, parent, 99) +        v = self.m.getTree(parent, False, None) +        self.assert_ordered(fids, 99, 100, v.root.fids, v.files) + +        fids = v.root.fids[50:51] +        v = self.assert_files_ordered(parent, fids, 0) + + +    def assert_files_ordered(self, parent, fids, pos): +        fs = [self.m.getFile(choice(fids)) for i in range(5)] +        self.m.orderFiles(fids, parent, pos) +        v = self.m.getTree(parent, False, False) +        self.assert_ordered(fids, pos, pos+len(fids), v.root.fids, v.files) + +        return v + +    def assert_pack_ordered(self, parent, pid, pos): +        self.m.orderPackage(pid, pos) +        v = self.m.getTree(parent, False, False) +        self.assert_ordered([pid], pos, pos+1, v.root.pids, v.packages, True) + +    # assert that ordering is total, complete with no gaps +    def assert_ordered(self, part, start, end, data, dict, pack=False): +        assert data[start:end] == part +        if pack: +            assert sorted([p.packageorder for p in dict.values()]) == range(len(dict)) +            assert [dict[pid].packageorder for pid in part] == range(start, end) +        else: +            assert sorted([f.fileorder for f in dict.values()]) == range(len(dict)) +            assert [dict[fid].fileorder for fid in part] == range(start, end) + + +    def test_move(self): + +        pid = self.pids[-1] +        pid2 = self.pids[1] + +        self.m.movePackage(pid, -1) +        v = self.m.getTree(-1, False, False) + +        assert v.root.pids[-1] == pid +        assert sorted([p.packageorder for p in v.packages.values()]) == range(len(v.packages)) + +        v = self.m.getTree(pid, False, False) +        fids = v.root.fids[10:20] +        self.m.moveFiles(fids, pid2) +        v = self.m.getTree(pid2, False, False) + +        assert sorted([f.fileorder for f in v.files.values()]) == range(len(v.files)) +        assert len(v.files) == self.count + len(fids) + + + +if __name__ == "__main__": +    TestFileManager.benchmark()
\ No newline at end of file diff --git a/tests/manager/test_interactionManager.py b/tests/manager/test_interactionManager.py new file mode 100644 index 000000000..5ff74c0f0 --- /dev/null +++ b/tests/manager/test_interactionManager.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase + +from tests.helper.Stubs import Core + +from pyload.Api import InputType, Interaction +from pyload.interaction.InteractionManager import InteractionManager + + +class TestInteractionManager(TestCase): +    ADMIN = None +    USER = 1 + +    def assertEmpty(self, list1): +        return self.assertListEqual(list1, []) + +    @classmethod +    def setUpClass(cls): +        cls.core = Core() + +    def setUp(self): +        self.im = InteractionManager(self.core) + +        self.assertFalse(self.im.isClientConnected(self.ADMIN)) +        self.assertFalse(self.im.isTaskWaiting(self.ADMIN)) +        self.assertEmpty(self.im.getTasks(self.ADMIN)) + +    def test_notifications(self): +        n = self.im.createNotification("test", "notify") + +        self.assertTrue(self.im.isTaskWaiting(self.ADMIN)) +        self.assertListEqual(self.im.getTasks(self.ADMIN), [n]) + +        n.seen = True +        self.assertFalse(self.im.isTaskWaiting(self.ADMIN)) + +        for i in range(10): +            self.im.createNotification("title", "test") + +        self.assertEqual(len(self.im.getTasks(self.ADMIN)), 11) +        self.assertFalse(self.im.getTasks(self.USER)) +        self.assertFalse(self.im.getTasks(self.ADMIN, Interaction.Query)) + + +    def test_captcha(self): +        t = self.im.createCaptchaTask("1", "png", "", owner=self.ADMIN) + +        self.assertEqual(t.type, Interaction.Captcha) +        self.assertListEqual(self.im.getTasks(self.ADMIN), [t]) +        self.assertEmpty(self.im.getTasks(self.USER)) +        t.setShared() +        self.assertListEqual(self.im.getTasks(self.USER), [t]) + +        t2 = self.im.createCaptchaTask("2", "png", "", owner=self.USER) +        self.assertTrue(self.im.isTaskWaiting(self.USER)) +        self.assertEmpty(self.im.getTasks(self.USER, Interaction.Query)) +        self.im.removeTask(t) + +        self.assertListEqual(self.im.getTasks(self.ADMIN), [t2]) +        self.assertIs(self.im.getTaskByID(t2.iid), t2) + + +    def test_query(self): +        t = self.im.createQueryTask(InputType.Text, "text", owner=self.ADMIN) + +        self.assertEqual(t.description, "text") +        self.assertListEqual(self.im.getTasks(self.ADMIN, Interaction.Query), [t]) +        self.assertEmpty(self.im.getTasks(Interaction.Captcha)) + + +    def test_clients(self): +        self.im.getTasks(self.ADMIN, Interaction.Captcha) + +        self.assertTrue(self.im.isClientConnected(self.ADMIN)) +        self.assertFalse(self.im.isClientConnected(self.USER)) + + +    def test_users(self): +        t = self.im.createCaptchaTask("1", "png", "", owner=self.USER) +        self.assertListEqual(self.im.getTasks(self.ADMIN), [t]) diff --git a/tests/nosetests.sh b/tests/nosetests.sh new file mode 100755 index 000000000..5b277ecb8 --- /dev/null +++ b/tests/nosetests.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +NS=nosetests +which nosetests2 > /dev/null && NS=nosetests2 +$NS tests/ --with-coverage --with-xunit --cover-package=pyload --cover-erase --process-timeout=60 +coverage xml diff --git a/tests/other/__init__.py b/tests/other/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/other/__init__.py diff --git a/tests/other/test_configparser.py b/tests/other/test_configparser.py new file mode 100644 index 000000000..0efd41aee --- /dev/null +++ b/tests/other/test_configparser.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +from nose.tools import raises + +from tests.helper.Stubs import Core + +from pyload.config.ConfigParser import ConfigParser + +class TestConfigParser(): + +    @classmethod +    def setUpClass(cls): +        # Only needed to get imports right +        cls.core = Core() +        cls.config = ConfigParser() + +    def test_dict(self): + +        assert self.config["general"]["language"] +        self.config["general"]["language"] = "de" +        assert self.config["general"]["language"] == "de" + +    def test_contains(self): + +        assert "general" in self.config +        assert "foobaar" not in self.config + +    def test_iter(self): +        for section, config, values in self.config.iterSections(): +            assert isinstance(section, basestring) +            assert isinstance(config.config, dict) +            assert isinstance(values, dict) + +    def test_get(self): +        assert self.config.getSection("general")[0].config + +    @raises(KeyError) +    def test_invalid_config(self): +        print self.config["invalid"]["config"] + +    @raises(KeyError) +    def test_invalid_section(self): +        print self.config["general"]["invalid"]
\ No newline at end of file diff --git a/tests/other/test_curlDownload.py b/tests/other/test_curlDownload.py new file mode 100644 index 000000000..17af1cdd4 --- /dev/null +++ b/tests/other/test_curlDownload.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +from os import stat + +from unittest import TestCase + +from tests.helper.Stubs import Core +from pyload.network.Bucket import Bucket +from pyload.plugins.network.CurlRequest import CurlRequest +from pyload.plugins.network.CurlDownload import CurlDownload + +class TestCurlRequest(TestCase): + +    cookieURL = "http://forum.pyload.org" + +    def setUp(self): +        self.dl = CurlDownload(Bucket()) + +    def tearDown(self): +        self.dl.close() + +    def test_download(self): + +        assert self.dl.context is not None + +        self.dl.download("http://pyload.org/lib/tpl/pyload/images/pyload-logo-edited3.5-new-font-small.png", "/tmp/random.bin") + +        print self.dl.size, self.dl.arrived +        assert self.dl.size == self.dl.arrived > 0 +        assert stat("/tmp/random.bin").st_size == self.dl.size + +    def test_cookies(self): + +        req = CurlRequest({}) +        req.load(self.cookieURL) + +        assert len(req.cj) > 0 + +        dl = CurlDownload(Bucket(), req) + +        assert req.context is dl.context is not None + +        dl.download(self.cookieURL + "/cookies.php", "cookies.txt") +        cookies = open("cookies.txt", "rb").read().splitlines() + +        self.assertEqual(len(cookies), len(dl.context)) +        for c in cookies: +            k, v = c.strip().split(":") +            self.assertIn(k, req.cj) + + +    def test_attributes(self): +        assert self.dl.size == 0 +        assert self.dl.speed == 0 +        assert self.dl.arrived == 0
\ No newline at end of file diff --git a/tests/other/test_curlRequest.py b/tests/other/test_curlRequest.py new file mode 100644 index 000000000..6bd4a2772 --- /dev/null +++ b/tests/other/test_curlRequest.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +from tests.helper.Stubs import Core +from pyload.plugins.network.CurlRequest import CurlRequest + +from unittest import TestCase + + +class TestCurlRequest(TestCase): +    # This page provides a test which prints all set cookies +    cookieURL = "http://forum.pyload.org" + +    def setUp(self): +        self.req = CurlRequest({}) + +    def tearDown(self): +        self.req.close() + +    def test_load(self): +        self.req.load("http://pyload.org") + +    def test_cookies(self): +        self.req.load(self.cookieURL, cookies=False) +        assert len(self.req.cj) == 0 + +        self.req.load(self.cookieURL) +        assert len(self.req.cj) > 1 + +        cookies = dict([c.strip().split(":") for c in self.req.load(self.cookieURL + "/cookies.php").splitlines()]) +        for k, v in cookies.iteritems(): +            self.assertIn(k, self.req.cj) +            self.assertEqual(v, self.req.cj[k].value) + +        for c in self.req.cj: +            self.assertIn(c, cookies) + +        cookies = self.req.load(self.cookieURL + "/cookies.php", cookies=False) +        self.assertEqual(cookies, "") + + +    def test_auth(self): +        pass
\ No newline at end of file diff --git a/tests/other/test_filedatabase.py b/tests/other/test_filedatabase.py new file mode 100644 index 000000000..f2a60e997 --- /dev/null +++ b/tests/other/test_filedatabase.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- + +from tests.helper.Stubs import Core +from tests.helper.BenchmarkTest import BenchmarkTest + +from pyload.Api import DownloadState, PackageInfo, FileInfo +from pyload.database import DatabaseBackend + +# disable asyncronous queries +DatabaseBackend.async = DatabaseBackend.queue + +from random import choice + +class TestDatabase(BenchmarkTest): +    bench = ["insert", "insert_links", "insert_many", "get_packages", +             "get_files", "get_files_queued", "get_package_childs", "get_package_files", +             "get_package_data", "get_file_data", "find_files", "collector", "purge"] +    pids = None +    fids = None +    owner = 123 +    pstatus = 0 + +    @classmethod +    def setUpClass(cls): +        cls.pids = [-1] +        cls.fids = [] + +        cls.db = DatabaseBackend(Core()) +        cls.db.manager = cls.db.core + +        cls.db.setup() +        cls.db.purgeAll() + +    @classmethod +    def tearDownClass(cls): +        cls.db.purgeAll() +        cls.db.shutdown() + +    # benchmarker ignore setup +    def setUp(self): +        self.db.purgeAll() +        self.pids = [-1] +        self.fids = [] + +        self.test_insert(20) +        self.test_insert_many() +        self.fids = self.db.getAllFiles().keys() + + +    def test_insert(self, n=200): +        for i in range(n): +            pid = self.db.addPackage("name", "folder", choice(self.pids), "password", "site", "comment", self.pstatus, +                self.owner) +            self.pids.append(pid) + +    def test_insert_links(self): +        for i in range(10000): +            fid = self.db.addLink("url %s" % i, "name", "plugin", choice(self.pids), self.owner) +            self.fids.append(fid) + +    def test_insert_many(self): +        for pid in self.pids: +            self.db.addLinks([("url %s" % i, "plugin") for i in range(50)], pid, self.owner) + +    def test_get_packages(self): +        packs = self.db.getAllPackages() +        n = len(packs) +        assert n == len(self.pids) - 1 + +        print "Fetched %d packages" % n +        self.assert_pack(choice(packs.values())) + +    def test_get_files(self): +        files = self.db.getAllFiles() +        n = len(files) +        assert n >= len(self.pids) + +        print "Fetched %d files" % n +        self.assert_file(choice(files.values())) + +    def test_get_files_queued(self): +        files = self.db.getAllFiles(state=DownloadState.Unfinished) +        print "Fetched %d files queued" % len(files) + +    def test_delete(self): +        pid = choice(self.pids) +        self.db.deletePackage(pid) +        self.pids.remove(pid) + +    def test_get_package_childs(self): +        pid = choice(self.pids) +        packs = self.db.getAllPackages(root=pid) + +        print "Package %d has %d packages" % (pid, len(packs)) +        self.assert_pack(choice(packs.values())) + +    def test_get_package_files(self): +        pid = choice(self.pids) +        files = self.db.getAllFiles(package=pid) + +        print "Package %d has %d files" % (pid, len(files)) +        self.assert_file(choice(files.values())) + +    def test_get_package_data(self, stats=False): +        pid = choice(self.pids) +        p = self.db.getPackageInfo(pid, stats) +        self.assert_pack(p) +        # test again with stat +        if not stats: +            self.test_get_package_data(True) + +    def test_get_file_data(self): +        fid = choice(self.fids) +        f = self.db.getFileInfo(fid) +        self.assert_file(f) + +    def test_find_files(self): +        files = self.db.getAllFiles(search="1") +        print "Found %s files" % len(files) +        f = choice(files.values()) + +        assert "1" in f.name +        names = self.db.getMatchingFilenames("1") +        for name in names: +            assert "1" in name + +    def test_collector(self): +        self.db.saveCollector(0, "data") +        assert self.db.retrieveCollector(0) == "data" +        self.db.deleteCollector(0) + +    def test_purge(self): +        self.db.purgeLinks() + + +    def test_user_context(self): +        self.db.purgeAll() + +        p1 = self.db.addPackage("name", "folder", 0, "password", "site", "comment", self.pstatus, 0) +        self.db.addLink("url", "name", "plugin", p1, 0) +        p2 = self.db.addPackage("name", "folder", 0, "password", "site", "comment", self.pstatus, 1) +        self.db.addLink("url", "name", "plugin", p2, 1) + +        assert len(self.db.getAllPackages(owner=0)) == 1 == len(self.db.getAllFiles(owner=0)) +        assert len(self.db.getAllPackages(root=0, owner=0)) == 1 == len(self.db.getAllFiles(package=p1, owner=0)) +        assert len(self.db.getAllPackages(owner=1)) == 1 == len(self.db.getAllFiles(owner=1)) +        assert len(self.db.getAllPackages(root=0, owner=1)) == 1 == len(self.db.getAllFiles(package=p2, owner=1)) +        assert len(self.db.getAllPackages()) == 2 == len(self.db.getAllFiles()) + +        self.db.deletePackage(p1, 1) +        assert len(self.db.getAllPackages(owner=0)) == 1 == len(self.db.getAllFiles(owner=0)) +        self.db.deletePackage(p1, 0) +        assert len(self.db.getAllPackages(owner=1)) == 1 == len(self.db.getAllFiles(owner=1)) +        self.db.deletePackage(p2) + +        assert len(self.db.getAllPackages()) == 0 + +    def test_count(self): +        self.db.purgeAll() + +        assert self.db.downloadstats() == (0,0) +        assert self.db.queuestats() == (0,0) +        assert self.db.processcount() == 0 + +    def test_update(self): +        p1 = self.db.addPackage("name", "folder", 0, "password", "site", "comment", self.pstatus, 0) +        pack = self.db.getPackageInfo(p1) +        assert isinstance(pack, PackageInfo) + +        pack.folder = "new folder" +        pack.comment = "lol" +        pack.tags.append("video") + +        self.db.updatePackage(pack) + +        pack = self.db.getPackageInfo(p1) +        assert pack.folder == "new folder" +        assert pack.comment == "lol" +        assert "video" in pack.tags + +    def assert_file(self, f): +        try: +            assert f is not None +            assert isinstance(f, FileInfo) +            self.assert_int(f, ("fid", "status", "size", "media", "fileorder", "added", "package", "owner")) +            assert f.status in range(5) +            assert f.owner == self.owner +            assert f.media in range(1024) +            assert f.package in self.pids +            assert f.added > 10 ** 6 # date is usually big integer +        except: +            print f +            raise + +    def assert_pack(self, p): +        try: +            assert p is not None +            assert isinstance(p, PackageInfo) +            self.assert_int(p, ("pid", "root", "added", "status", "packageorder", "owner")) +            assert p.pid in self.pids +            assert p.owner == self.owner +            assert p.status in range(5) +            assert p.root in self.pids +            assert p.added > 10 ** 6 +            assert isinstance(p.tags, list) +            assert p.shared in (0, 1) +        except: +            print p +            raise + +    def assert_int(self, obj, list): +        for attr in list: assert type(getattr(obj, attr)) == int + +if __name__ == "__main__": +    TestDatabase.benchmark()
\ No newline at end of file diff --git a/tests/other/test_requestFactory.py b/tests/other/test_requestFactory.py new file mode 100644 index 000000000..751e7f03b --- /dev/null +++ b/tests/other/test_requestFactory.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +from tests.helper.Stubs import Core + +from pyload.plugins.network.CurlRequest import CurlRequest +from pyload.network.RequestFactory import RequestFactory + +class TestRequestFactory: + +    @classmethod +    def setUpClass(cls): +        cls.req = RequestFactory(Core()) + +    def test_get_request(self): +        req = self.req.getRequest() + +        new_req = self.req.getRequest(req.getContext()) +        assert new_req.getContext() == req.getContext() + +        cj = CurlRequest.CONTEXT_CLASS() +        assert self.req.getRequest(cj).context is cj + +    def test_get_request_class(self): + +        self.req.getRequest(None, CurlRequest) + +    def test_get_download(self): +        dl = self.req.getDownloadRequest() +        dl.close() + +        # with given request +        req = self.req.getRequest() +        dl = self.req.getDownloadRequest(req) + +        assert req.context is dl.context +        assert req.options is dl.options + +        dl.close()
\ No newline at end of file diff --git a/tests/other/test_syntax.py b/tests/other/test_syntax.py new file mode 100644 index 000000000..396fc8f4b --- /dev/null +++ b/tests/other/test_syntax.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +from os import walk +from os.path import abspath, dirname, join + +from unittest import TestCase + +PATH = abspath(join(dirname(abspath(__file__)), "..", "..", "")) + +# needed to register globals +from tests.helper import Stubs + +class TestSyntax(TestCase): +    pass + + +for path, dirs, files in walk(join(PATH, "pyload")): + +    for f in files: +        if not f.endswith(".py") or f.startswith("__"): continue +        fpath = join(path, f) +        pack = fpath.replace(PATH, "")[1:-3] #replace / and  .py +        imp = pack.replace("/", ".") +        packages = imp.split(".") +        #__import__(imp) + +        # to much sideeffect when importing +        if "web" in packages or "lib" in packages: continue + +        # currying +        def meta(imp, sig): +            def _test(self=None): +                __import__(imp) + +            _test.func_name = sig +            return _test + +        # generate test methods +        sig = "test_%s_%s" % (packages[-2], packages[-1]) +        setattr(TestSyntax, sig, meta(imp, sig))
\ No newline at end of file diff --git a/tests/plugin_tests.sh b/tests/plugin_tests.sh new file mode 100755 index 000000000..789dddd91 --- /dev/null +++ b/tests/plugin_tests.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +NS=nosetests +which nosetests2 > /dev/null && NS=nosetests2 +# must be executed within tests dir +cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +$NS HosterPluginTester.py CrypterPluginTester.py -s --with-xunit --with-coverage --cover-erase --cover-package=pyload.plugins --with-id +coverage xml diff --git a/tests/quit_pyload.sh b/tests/quit_pyload.sh new file mode 100755 index 000000000..3bfb3e13a --- /dev/null +++ b/tests/quit_pyload.sh @@ -0,0 +1,7 @@ +#!/bin/bash +PYTHON=python +which python2 > /dev/null && PYTHON=python2 +$PYTHON pyload.py --configdir=tests/config --quit +if [ -d userplugins ]; then +    rm -r userplugins +fi
\ No newline at end of file diff --git a/tests/run_pyload.sh b/tests/run_pyload.sh new file mode 100755 index 000000000..9ffc04b4f --- /dev/null +++ b/tests/run_pyload.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +PIDFILE="tests/pyload.pid" +if [ -f "$PIDFILE" ] +then +    kill -9 $(<"$PIDFILE") +fi + +cp tests/config/pyload.db.org tests/config/pyload.db +cp tests/config/pyload.conf.org tests/config/pyload.conf + +PYTHON=python +which python2 > /dev/null && PYTHON=python2 + +touch pyload.out +$PYTHON pyload.py -d --configdir=tests/config > pyload.out 2> pyload.err & + +for i in {1..30}; do +    grep "pyLoad is up and running" pyload.out > /dev/null && echo "pyLoad started" && break +    sleep 1 +done + +echo "pyLoad start script finished" + diff --git a/tests/test_api.py b/tests/test_api.py deleted file mode 100644 index f8901f731..000000000 --- a/tests/test_api.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- coding: utf-8 -*- - -from module.common import APIExerciser -from nose.tools import nottest - - -class TestApi: - -    def __init__(self): -        self.api = APIExerciser.APIExerciser(None, True, "TestUser", "pwhere") - -    def test_login(self): -        assert self.api.api.login("crapp", "wrong pw") is False - -    #takes really long, only test when needed -    @nottest -    def test_random(self): - -        for i in range(0, 100): -            self.api.testAPI() diff --git a/tests/test_json.py b/tests/test_json.py deleted file mode 100644 index ff56e8f5a..000000000 --- a/tests/test_json.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- - -from urllib import urlencode -from urllib2 import urlopen, HTTPError -from json import loads - -from logging import log - -url = "http://localhost:8001/api/%s" - -class TestJson: - -    def call(self, name, post=None): -        if not post: post = {} -        post["session"] = self.key -        u = urlopen(url % name, data=urlencode(post)) -        return loads(u.read()) - -    def setUp(self): -        u = urlopen(url % "login", data=urlencode({"username": "TestUser", "password": "pwhere"})) -        self.key = loads(u.read()) -        assert self.key is not False - -    def test_wronglogin(self): -        u = urlopen(url % "login", data=urlencode({"username": "crap", "password": "wrongpw"})) -        assert loads(u.read()) is False - -    def test_access(self): -        try: -            urlopen(url % "getServerVersion") -        except HTTPError, e: -            assert e.code == 403 -        else: -            assert False - -    def test_status(self): -        ret = self.call("statusServer") -        log(1, str(ret)) -        assert "pause" in ret -        assert "queue" in ret - -    def test_unknown_method(self): -        try: -            self.call("notExisting") -        except HTTPError, e: -            assert e.code == 404 -        else: -            assert False
\ No newline at end of file | 
