diff options
Diffstat (limited to 'tests/manager')
-rw-r--r-- | tests/manager/__init__.py | 0 | ||||
-rw-r--r-- | tests/manager/test_configManager.py | 135 | ||||
-rw-r--r-- | tests/manager/test_filemanager.py | 223 | ||||
-rw-r--r-- | tests/manager/test_interactionManager.py | 81 |
4 files changed, 439 insertions, 0 deletions
diff --git a/tests/manager/__init__.py b/tests/manager/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/manager/__init__.py diff --git a/tests/manager/test_configManager.py b/tests/manager/test_configManager.py new file mode 100644 index 000000000..97b43dd66 --- /dev/null +++ b/tests/manager/test_configManager.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase +from collections import defaultdict + +from nose.tools import raises + +from tests.helper.Stubs import Core + +from pyload.Api import InvalidConfigSection +from pyload.database import DatabaseBackend +from pyload.config.ConfigParser import ConfigParser +from pyload.config.ConfigManager import ConfigManager + +adminUser = None +normalUser = 1 + +class TestConfigManager(TestCase): + + @classmethod + def setUpClass(cls): + cls.core = Core() + cls.db = DatabaseBackend(cls.core) + cls.core.db = cls.db + cls.db.manager = cls.core + cls.db.manager.statusMsg = defaultdict(lambda: "statusmsg") + cls.parser = ConfigParser() + cls.config = ConfigManager(cls.core, cls.parser) + cls.db.setup() + + def setUp(self): + self.db.clearAllConfigs() + # used by some tests, needs to be deleted + self.config.delete("plugin", adminUser) + + + def addConfig(self): + self.config.addConfigSection("plugin", "Name", "desc", "something", + [("value", "str", "label", "desc", "default")]) + + def test_db(self): + + self.db.saveConfig("plugin", "some value", 0) + self.db.saveConfig("plugin", "some other value", 1) + + assert self.db.loadConfig("plugin", 0) == "some value" + assert self.db.loadConfig("plugin", 1) == "some other value" + + d = self.db.loadAllConfigs() + assert d[0]["plugin"] == "some value" + assert self.db.loadConfigsForUser(0)["plugin"] == "some value" + + self.db.deleteConfig("plugin", 0) + + assert 0 not in self.db.loadAllConfigs() + assert "plugin" not in self.db.loadConfigsForUser(0) + + self.db.deleteConfig("plugin") + + assert not self.db.loadAllConfigs() + assert self.db.loadConfig("plugin") == "" + + def test_parser(self): + assert self.config.get("general", "language") + self.config["general"]["language"] = "de" + assert self.config["general"]["language"] == "de" + assert self.config.get("general", "language", adminUser) == "de" + + def test_user(self): + self.addConfig() + + assert self.config["plugin"]["value"] == "default" + assert self.config.get("plugin", "value", adminUser) == "default" + assert self.config.get("plugin", "value", normalUser) == "default" + + assert self.config.set("plugin", "value", False, user=normalUser) + assert self.config.get("plugin", "value", normalUser) is False + assert self.config["plugin"]["value"] == "default" + + assert self.config.set("plugin", "value", True, user=adminUser) + assert self.config.get("plugin", "value", adminUser) is True + assert self.config["plugin"]["value"] is True + assert self.config.get("plugin", "value", normalUser) is False + + self.config.delete("plugin", normalUser) + assert self.config.get("plugin", "value", normalUser) == "default" + + self.config.delete("plugin") + assert self.config.get("plugin", "value", adminUser) == "default" + assert self.config["plugin"]["value"] == "default" + + # should not trigger something + self.config.delete("foo") + + def test_sections(self): + self.addConfig() + + i = 0 + # there should be only one section, with no values + for name, config, values in self.config.iterSections(adminUser): + assert name == "plugin" + assert values == {} + i +=1 + assert i == 1 + + assert self.config.set("plugin", "value", True, user=adminUser) + + i = 0 + # now we assert the correct values + for name, config, values in self.config.iterSections(adminUser): + assert name == "plugin" + assert values == {"value":True} + i +=1 + assert i == 1 + + def test_get_section(self): + self.addConfig() + assert self.config.getSection("plugin")[0].name == "Name" + + # TODO: more save tests are needed + def test_saveValues(self): + self.addConfig() + self.config.saveValues(adminUser, "plugin") + + @raises(InvalidConfigSection) + def test_restricted_access(self): + self.config.get("general", "language", normalUser) + + @raises(InvalidConfigSection) + def test_error(self): + self.config.get("foo", "bar") + + @raises(InvalidConfigSection) + def test_error_set(self): + self.config.set("foo", "bar", True)
\ No newline at end of file diff --git a/tests/manager/test_filemanager.py b/tests/manager/test_filemanager.py new file mode 100644 index 000000000..a7507cada --- /dev/null +++ b/tests/manager/test_filemanager.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- + +from random import choice + +from tests.helper.Stubs import Core +from tests.helper.BenchmarkTest import BenchmarkTest + +from pyload.database import DatabaseBackend +# disable asyncronous queries +DatabaseBackend.async = DatabaseBackend.queue + +from pyload.Api import DownloadState +from pyload.FileManager import FileManager + + +class TestFileManager(BenchmarkTest): + bench = ["add_packages", "add_files", "get_files_root", "get", + "get_package_content", "get_package_tree", + "order_package", "order_files", "move"] + + pids = [-1] + count = 100 + + @classmethod + def setUpClass(cls): + c = Core() + # db needs seperate initialisation + cls.db = c.db = DatabaseBackend(c) + cls.db.setup() + cls.db.purgeAll() + + cls.m = cls.db.manager = FileManager(c) + + @classmethod + def tearDownClass(cls): + cls.db.purgeAll() + cls.db.shutdown() + + + # benchmarker ignore setup + def setUp(self): + self.db.purgeAll() + self.pids = [-1] + + self.count = 20 + self.test_add_packages() + self.test_add_files() + + def test_add_packages(self): + for i in range(100): + pid = self.m.addPackage("name", "folder", choice(self.pids), "", "", "", False) + self.pids.append(pid) + + if -1 in self.pids: + self.pids.remove(-1) + + def test_add_files(self): + for pid in self.pids: + self.m.addLinks([("plugin %d" % i, "url %s" % i) for i in range(self.count)], pid) + + count = self.m.getQueueStats()[0] + files = self.count * len(self.pids) + # in test runner files get added twice + assert count == files or count == files * 2 + + def test_get(self): + info = self.m.getPackageInfo(choice(self.pids)) + assert info.stats.linkstotal == self.count + + fid = choice(info.fids) + f = self.m.getFile(fid) + assert f.fid in self.m.files + + f.name = "new name" + f.sync() + finfo = self.m.getFileInfo(fid) + assert finfo is not None + assert finfo.name == "new name" + + p = self.m.getPackage(choice(self.pids)) + assert p is not None + assert p.pid in self.m.packages + p.sync() + + p.delete() + + self.m.getTree(-1, True, None) + + def test_get_filtered(self): + all = self.m.getTree(-1, True, None) + finished = self.m.getTree(-1, True, DownloadState.Finished) + unfinished = self.m.getTree(-1, True, DownloadState.Unfinished) + + assert len(finished.files) + len(unfinished.files) == len(all.files) == self.m.db.filecount() + + + def test_get_files_root(self): + view = self.m.getTree(-1, True, None) + + for pid in self.pids: + assert pid in view.packages + + assert len(view.packages) == len(self.pids) + + p = choice(view.packages.values()) + assert len(p.fids) == self.count + assert p.stats.linkstotal == self.count + + + def test_get_package_content(self): + view = self.m.getTree(choice(self.pids), False, None) + p = view.root + + assert len(view.packages) == len(p.pids) + for pid in p.pids: assert pid in view.packages + + def test_get_package_tree(self): + view = self.m.getTree(choice(self.pids), True, None) + for pid in view.root.pids: assert pid in view.packages + for fid in view.root.fids: assert fid in view.files + + def test_delete(self): + self.m.deleteFile(self.count * 5) + self.m.deletePackage(choice(self.pids)) + + def test_order_package(self): + parent = self.m.addPackage("order", "", -1, "", "", "", False) + self.m.addLinks([("url", "plugin") for i in range(100)], parent) + + pids = [self.m.addPackage("c", "", parent, "", "", "", False) for i in range(5)] + v = self.m.getTree(parent, False, None) + self.assert_ordered(pids, 0, 5, v.root.pids, v.packages, True) + + pid = v.packages.keys()[0] + self.assert_pack_ordered(parent, pid, 3) + self.assert_pack_ordered(parent, pid, 0) + self.assert_pack_ordered(parent, pid, 0) + self.assert_pack_ordered(parent, pid, 4) + pid = v.packages.keys()[2] + self.assert_pack_ordered(parent, pid, 4) + self.assert_pack_ordered(parent, pid, 3) + self.assert_pack_ordered(parent, pid, 2) + + + def test_order_files(self): + parent = self.m.addPackage("order", "", -1, "", "", "", False) + self.m.addLinks([("url", "plugin") for i in range(100)], parent) + v = self.m.getTree(parent, False, None) + + fids = v.root.fids[10:20] + v = self.assert_files_ordered(parent, fids, 0) + + fids = v.root.fids[20:30] + + self.m.orderFiles(fids, parent, 99) + v = self.m.getTree(parent, False, None) + assert fids[-1] == v.root.fids[-1] + assert fids[0] == v.root.fids[90] + self.assert_ordered(fids, 90, 100, v.root.fids, v.files) + + fids = v.root.fids[80:] + v = self.assert_files_ordered(parent, fids, 20) + + self.m.orderFiles(fids, parent, 80) + v = self.m.getTree(parent, False, None) + self.assert_ordered(fids, 61, 81, v.root.fids, v.files) + + fids = v.root.fids[50:51] + self.m.orderFiles(fids, parent, 99) + v = self.m.getTree(parent, False, None) + self.assert_ordered(fids, 99, 100, v.root.fids, v.files) + + fids = v.root.fids[50:51] + v = self.assert_files_ordered(parent, fids, 0) + + + def assert_files_ordered(self, parent, fids, pos): + fs = [self.m.getFile(choice(fids)) for i in range(5)] + self.m.orderFiles(fids, parent, pos) + v = self.m.getTree(parent, False, False) + self.assert_ordered(fids, pos, pos+len(fids), v.root.fids, v.files) + + return v + + def assert_pack_ordered(self, parent, pid, pos): + self.m.orderPackage(pid, pos) + v = self.m.getTree(parent, False, False) + self.assert_ordered([pid], pos, pos+1, v.root.pids, v.packages, True) + + # assert that ordering is total, complete with no gaps + def assert_ordered(self, part, start, end, data, dict, pack=False): + assert data[start:end] == part + if pack: + assert sorted([p.packageorder for p in dict.values()]) == range(len(dict)) + assert [dict[pid].packageorder for pid in part] == range(start, end) + else: + assert sorted([f.fileorder for f in dict.values()]) == range(len(dict)) + assert [dict[fid].fileorder for fid in part] == range(start, end) + + + def test_move(self): + + pid = self.pids[-1] + pid2 = self.pids[1] + + self.m.movePackage(pid, -1) + v = self.m.getTree(-1, False, False) + + assert v.root.pids[-1] == pid + assert sorted([p.packageorder for p in v.packages.values()]) == range(len(v.packages)) + + v = self.m.getTree(pid, False, False) + fids = v.root.fids[10:20] + self.m.moveFiles(fids, pid2) + v = self.m.getTree(pid2, False, False) + + assert sorted([f.fileorder for f in v.files.values()]) == range(len(v.files)) + assert len(v.files) == self.count + len(fids) + + + +if __name__ == "__main__": + TestFileManager.benchmark()
\ No newline at end of file diff --git a/tests/manager/test_interactionManager.py b/tests/manager/test_interactionManager.py new file mode 100644 index 000000000..5ff74c0f0 --- /dev/null +++ b/tests/manager/test_interactionManager.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +from unittest import TestCase + +from tests.helper.Stubs import Core + +from pyload.Api import InputType, Interaction +from pyload.interaction.InteractionManager import InteractionManager + + +class TestInteractionManager(TestCase): + ADMIN = None + USER = 1 + + def assertEmpty(self, list1): + return self.assertListEqual(list1, []) + + @classmethod + def setUpClass(cls): + cls.core = Core() + + def setUp(self): + self.im = InteractionManager(self.core) + + self.assertFalse(self.im.isClientConnected(self.ADMIN)) + self.assertFalse(self.im.isTaskWaiting(self.ADMIN)) + self.assertEmpty(self.im.getTasks(self.ADMIN)) + + def test_notifications(self): + n = self.im.createNotification("test", "notify") + + self.assertTrue(self.im.isTaskWaiting(self.ADMIN)) + self.assertListEqual(self.im.getTasks(self.ADMIN), [n]) + + n.seen = True + self.assertFalse(self.im.isTaskWaiting(self.ADMIN)) + + for i in range(10): + self.im.createNotification("title", "test") + + self.assertEqual(len(self.im.getTasks(self.ADMIN)), 11) + self.assertFalse(self.im.getTasks(self.USER)) + self.assertFalse(self.im.getTasks(self.ADMIN, Interaction.Query)) + + + def test_captcha(self): + t = self.im.createCaptchaTask("1", "png", "", owner=self.ADMIN) + + self.assertEqual(t.type, Interaction.Captcha) + self.assertListEqual(self.im.getTasks(self.ADMIN), [t]) + self.assertEmpty(self.im.getTasks(self.USER)) + t.setShared() + self.assertListEqual(self.im.getTasks(self.USER), [t]) + + t2 = self.im.createCaptchaTask("2", "png", "", owner=self.USER) + self.assertTrue(self.im.isTaskWaiting(self.USER)) + self.assertEmpty(self.im.getTasks(self.USER, Interaction.Query)) + self.im.removeTask(t) + + self.assertListEqual(self.im.getTasks(self.ADMIN), [t2]) + self.assertIs(self.im.getTaskByID(t2.iid), t2) + + + def test_query(self): + t = self.im.createQueryTask(InputType.Text, "text", owner=self.ADMIN) + + self.assertEqual(t.description, "text") + self.assertListEqual(self.im.getTasks(self.ADMIN, Interaction.Query), [t]) + self.assertEmpty(self.im.getTasks(Interaction.Captcha)) + + + def test_clients(self): + self.im.getTasks(self.ADMIN, Interaction.Captcha) + + self.assertTrue(self.im.isClientConnected(self.ADMIN)) + self.assertFalse(self.im.isClientConnected(self.USER)) + + + def test_users(self): + t = self.im.createCaptchaTask("1", "png", "", owner=self.USER) + self.assertListEqual(self.im.getTasks(self.ADMIN), [t]) |