summaryrefslogtreecommitdiffstats
path: root/tests/manager
diff options
context:
space:
mode:
Diffstat (limited to 'tests/manager')
-rw-r--r--tests/manager/__init__.py0
-rw-r--r--tests/manager/test_configManager.py135
-rw-r--r--tests/manager/test_filemanager.py223
-rw-r--r--tests/manager/test_interactionManager.py81
4 files changed, 439 insertions, 0 deletions
diff --git a/tests/manager/__init__.py b/tests/manager/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/manager/__init__.py
diff --git a/tests/manager/test_configManager.py b/tests/manager/test_configManager.py
new file mode 100644
index 000000000..97b43dd66
--- /dev/null
+++ b/tests/manager/test_configManager.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+
+from unittest import TestCase
+from collections import defaultdict
+
+from nose.tools import raises
+
+from tests.helper.Stubs import Core
+
+from pyload.Api import InvalidConfigSection
+from pyload.database import DatabaseBackend
+from pyload.config.ConfigParser import ConfigParser
+from pyload.config.ConfigManager import ConfigManager
+
+adminUser = None
+normalUser = 1
+
+class TestConfigManager(TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.core = Core()
+ cls.db = DatabaseBackend(cls.core)
+ cls.core.db = cls.db
+ cls.db.manager = cls.core
+ cls.db.manager.statusMsg = defaultdict(lambda: "statusmsg")
+ cls.parser = ConfigParser()
+ cls.config = ConfigManager(cls.core, cls.parser)
+ cls.db.setup()
+
+ def setUp(self):
+ self.db.clearAllConfigs()
+ # used by some tests, needs to be deleted
+ self.config.delete("plugin", adminUser)
+
+
+ def addConfig(self):
+ self.config.addConfigSection("plugin", "Name", "desc", "something",
+ [("value", "str", "label", "desc", "default")])
+
+ def test_db(self):
+
+ self.db.saveConfig("plugin", "some value", 0)
+ self.db.saveConfig("plugin", "some other value", 1)
+
+ assert self.db.loadConfig("plugin", 0) == "some value"
+ assert self.db.loadConfig("plugin", 1) == "some other value"
+
+ d = self.db.loadAllConfigs()
+ assert d[0]["plugin"] == "some value"
+ assert self.db.loadConfigsForUser(0)["plugin"] == "some value"
+
+ self.db.deleteConfig("plugin", 0)
+
+ assert 0 not in self.db.loadAllConfigs()
+ assert "plugin" not in self.db.loadConfigsForUser(0)
+
+ self.db.deleteConfig("plugin")
+
+ assert not self.db.loadAllConfigs()
+ assert self.db.loadConfig("plugin") == ""
+
+ def test_parser(self):
+ assert self.config.get("general", "language")
+ self.config["general"]["language"] = "de"
+ assert self.config["general"]["language"] == "de"
+ assert self.config.get("general", "language", adminUser) == "de"
+
+ def test_user(self):
+ self.addConfig()
+
+ assert self.config["plugin"]["value"] == "default"
+ assert self.config.get("plugin", "value", adminUser) == "default"
+ assert self.config.get("plugin", "value", normalUser) == "default"
+
+ assert self.config.set("plugin", "value", False, user=normalUser)
+ assert self.config.get("plugin", "value", normalUser) is False
+ assert self.config["plugin"]["value"] == "default"
+
+ assert self.config.set("plugin", "value", True, user=adminUser)
+ assert self.config.get("plugin", "value", adminUser) is True
+ assert self.config["plugin"]["value"] is True
+ assert self.config.get("plugin", "value", normalUser) is False
+
+ self.config.delete("plugin", normalUser)
+ assert self.config.get("plugin", "value", normalUser) == "default"
+
+ self.config.delete("plugin")
+ assert self.config.get("plugin", "value", adminUser) == "default"
+ assert self.config["plugin"]["value"] == "default"
+
+ # should not trigger something
+ self.config.delete("foo")
+
+ def test_sections(self):
+ self.addConfig()
+
+ i = 0
+ # there should be only one section, with no values
+ for name, config, values in self.config.iterSections(adminUser):
+ assert name == "plugin"
+ assert values == {}
+ i +=1
+ assert i == 1
+
+ assert self.config.set("plugin", "value", True, user=adminUser)
+
+ i = 0
+ # now we assert the correct values
+ for name, config, values in self.config.iterSections(adminUser):
+ assert name == "plugin"
+ assert values == {"value":True}
+ i +=1
+ assert i == 1
+
+ def test_get_section(self):
+ self.addConfig()
+ assert self.config.getSection("plugin")[0].name == "Name"
+
+ # TODO: more save tests are needed
+ def test_saveValues(self):
+ self.addConfig()
+ self.config.saveValues(adminUser, "plugin")
+
+ @raises(InvalidConfigSection)
+ def test_restricted_access(self):
+ self.config.get("general", "language", normalUser)
+
+ @raises(InvalidConfigSection)
+ def test_error(self):
+ self.config.get("foo", "bar")
+
+ @raises(InvalidConfigSection)
+ def test_error_set(self):
+ self.config.set("foo", "bar", True) \ No newline at end of file
diff --git a/tests/manager/test_filemanager.py b/tests/manager/test_filemanager.py
new file mode 100644
index 000000000..a7507cada
--- /dev/null
+++ b/tests/manager/test_filemanager.py
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+
+from random import choice
+
+from tests.helper.Stubs import Core
+from tests.helper.BenchmarkTest import BenchmarkTest
+
+from pyload.database import DatabaseBackend
+# disable asyncronous queries
+DatabaseBackend.async = DatabaseBackend.queue
+
+from pyload.Api import DownloadState
+from pyload.FileManager import FileManager
+
+
+class TestFileManager(BenchmarkTest):
+ bench = ["add_packages", "add_files", "get_files_root", "get",
+ "get_package_content", "get_package_tree",
+ "order_package", "order_files", "move"]
+
+ pids = [-1]
+ count = 100
+
+ @classmethod
+ def setUpClass(cls):
+ c = Core()
+ # db needs seperate initialisation
+ cls.db = c.db = DatabaseBackend(c)
+ cls.db.setup()
+ cls.db.purgeAll()
+
+ cls.m = cls.db.manager = FileManager(c)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.db.purgeAll()
+ cls.db.shutdown()
+
+
+ # benchmarker ignore setup
+ def setUp(self):
+ self.db.purgeAll()
+ self.pids = [-1]
+
+ self.count = 20
+ self.test_add_packages()
+ self.test_add_files()
+
+ def test_add_packages(self):
+ for i in range(100):
+ pid = self.m.addPackage("name", "folder", choice(self.pids), "", "", "", False)
+ self.pids.append(pid)
+
+ if -1 in self.pids:
+ self.pids.remove(-1)
+
+ def test_add_files(self):
+ for pid in self.pids:
+ self.m.addLinks([("plugin %d" % i, "url %s" % i) for i in range(self.count)], pid)
+
+ count = self.m.getQueueStats()[0]
+ files = self.count * len(self.pids)
+ # in test runner files get added twice
+ assert count == files or count == files * 2
+
+ def test_get(self):
+ info = self.m.getPackageInfo(choice(self.pids))
+ assert info.stats.linkstotal == self.count
+
+ fid = choice(info.fids)
+ f = self.m.getFile(fid)
+ assert f.fid in self.m.files
+
+ f.name = "new name"
+ f.sync()
+ finfo = self.m.getFileInfo(fid)
+ assert finfo is not None
+ assert finfo.name == "new name"
+
+ p = self.m.getPackage(choice(self.pids))
+ assert p is not None
+ assert p.pid in self.m.packages
+ p.sync()
+
+ p.delete()
+
+ self.m.getTree(-1, True, None)
+
+ def test_get_filtered(self):
+ all = self.m.getTree(-1, True, None)
+ finished = self.m.getTree(-1, True, DownloadState.Finished)
+ unfinished = self.m.getTree(-1, True, DownloadState.Unfinished)
+
+ assert len(finished.files) + len(unfinished.files) == len(all.files) == self.m.db.filecount()
+
+
+ def test_get_files_root(self):
+ view = self.m.getTree(-1, True, None)
+
+ for pid in self.pids:
+ assert pid in view.packages
+
+ assert len(view.packages) == len(self.pids)
+
+ p = choice(view.packages.values())
+ assert len(p.fids) == self.count
+ assert p.stats.linkstotal == self.count
+
+
+ def test_get_package_content(self):
+ view = self.m.getTree(choice(self.pids), False, None)
+ p = view.root
+
+ assert len(view.packages) == len(p.pids)
+ for pid in p.pids: assert pid in view.packages
+
+ def test_get_package_tree(self):
+ view = self.m.getTree(choice(self.pids), True, None)
+ for pid in view.root.pids: assert pid in view.packages
+ for fid in view.root.fids: assert fid in view.files
+
+ def test_delete(self):
+ self.m.deleteFile(self.count * 5)
+ self.m.deletePackage(choice(self.pids))
+
+ def test_order_package(self):
+ parent = self.m.addPackage("order", "", -1, "", "", "", False)
+ self.m.addLinks([("url", "plugin") for i in range(100)], parent)
+
+ pids = [self.m.addPackage("c", "", parent, "", "", "", False) for i in range(5)]
+ v = self.m.getTree(parent, False, None)
+ self.assert_ordered(pids, 0, 5, v.root.pids, v.packages, True)
+
+ pid = v.packages.keys()[0]
+ self.assert_pack_ordered(parent, pid, 3)
+ self.assert_pack_ordered(parent, pid, 0)
+ self.assert_pack_ordered(parent, pid, 0)
+ self.assert_pack_ordered(parent, pid, 4)
+ pid = v.packages.keys()[2]
+ self.assert_pack_ordered(parent, pid, 4)
+ self.assert_pack_ordered(parent, pid, 3)
+ self.assert_pack_ordered(parent, pid, 2)
+
+
+ def test_order_files(self):
+ parent = self.m.addPackage("order", "", -1, "", "", "", False)
+ self.m.addLinks([("url", "plugin") for i in range(100)], parent)
+ v = self.m.getTree(parent, False, None)
+
+ fids = v.root.fids[10:20]
+ v = self.assert_files_ordered(parent, fids, 0)
+
+ fids = v.root.fids[20:30]
+
+ self.m.orderFiles(fids, parent, 99)
+ v = self.m.getTree(parent, False, None)
+ assert fids[-1] == v.root.fids[-1]
+ assert fids[0] == v.root.fids[90]
+ self.assert_ordered(fids, 90, 100, v.root.fids, v.files)
+
+ fids = v.root.fids[80:]
+ v = self.assert_files_ordered(parent, fids, 20)
+
+ self.m.orderFiles(fids, parent, 80)
+ v = self.m.getTree(parent, False, None)
+ self.assert_ordered(fids, 61, 81, v.root.fids, v.files)
+
+ fids = v.root.fids[50:51]
+ self.m.orderFiles(fids, parent, 99)
+ v = self.m.getTree(parent, False, None)
+ self.assert_ordered(fids, 99, 100, v.root.fids, v.files)
+
+ fids = v.root.fids[50:51]
+ v = self.assert_files_ordered(parent, fids, 0)
+
+
+ def assert_files_ordered(self, parent, fids, pos):
+ fs = [self.m.getFile(choice(fids)) for i in range(5)]
+ self.m.orderFiles(fids, parent, pos)
+ v = self.m.getTree(parent, False, False)
+ self.assert_ordered(fids, pos, pos+len(fids), v.root.fids, v.files)
+
+ return v
+
+ def assert_pack_ordered(self, parent, pid, pos):
+ self.m.orderPackage(pid, pos)
+ v = self.m.getTree(parent, False, False)
+ self.assert_ordered([pid], pos, pos+1, v.root.pids, v.packages, True)
+
+ # assert that ordering is total, complete with no gaps
+ def assert_ordered(self, part, start, end, data, dict, pack=False):
+ assert data[start:end] == part
+ if pack:
+ assert sorted([p.packageorder for p in dict.values()]) == range(len(dict))
+ assert [dict[pid].packageorder for pid in part] == range(start, end)
+ else:
+ assert sorted([f.fileorder for f in dict.values()]) == range(len(dict))
+ assert [dict[fid].fileorder for fid in part] == range(start, end)
+
+
+ def test_move(self):
+
+ pid = self.pids[-1]
+ pid2 = self.pids[1]
+
+ self.m.movePackage(pid, -1)
+ v = self.m.getTree(-1, False, False)
+
+ assert v.root.pids[-1] == pid
+ assert sorted([p.packageorder for p in v.packages.values()]) == range(len(v.packages))
+
+ v = self.m.getTree(pid, False, False)
+ fids = v.root.fids[10:20]
+ self.m.moveFiles(fids, pid2)
+ v = self.m.getTree(pid2, False, False)
+
+ assert sorted([f.fileorder for f in v.files.values()]) == range(len(v.files))
+ assert len(v.files) == self.count + len(fids)
+
+
+
+if __name__ == "__main__":
+ TestFileManager.benchmark() \ No newline at end of file
diff --git a/tests/manager/test_interactionManager.py b/tests/manager/test_interactionManager.py
new file mode 100644
index 000000000..5ff74c0f0
--- /dev/null
+++ b/tests/manager/test_interactionManager.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+
+from unittest import TestCase
+
+from tests.helper.Stubs import Core
+
+from pyload.Api import InputType, Interaction
+from pyload.interaction.InteractionManager import InteractionManager
+
+
+class TestInteractionManager(TestCase):
+ ADMIN = None
+ USER = 1
+
+ def assertEmpty(self, list1):
+ return self.assertListEqual(list1, [])
+
+ @classmethod
+ def setUpClass(cls):
+ cls.core = Core()
+
+ def setUp(self):
+ self.im = InteractionManager(self.core)
+
+ self.assertFalse(self.im.isClientConnected(self.ADMIN))
+ self.assertFalse(self.im.isTaskWaiting(self.ADMIN))
+ self.assertEmpty(self.im.getTasks(self.ADMIN))
+
+ def test_notifications(self):
+ n = self.im.createNotification("test", "notify")
+
+ self.assertTrue(self.im.isTaskWaiting(self.ADMIN))
+ self.assertListEqual(self.im.getTasks(self.ADMIN), [n])
+
+ n.seen = True
+ self.assertFalse(self.im.isTaskWaiting(self.ADMIN))
+
+ for i in range(10):
+ self.im.createNotification("title", "test")
+
+ self.assertEqual(len(self.im.getTasks(self.ADMIN)), 11)
+ self.assertFalse(self.im.getTasks(self.USER))
+ self.assertFalse(self.im.getTasks(self.ADMIN, Interaction.Query))
+
+
+ def test_captcha(self):
+ t = self.im.createCaptchaTask("1", "png", "", owner=self.ADMIN)
+
+ self.assertEqual(t.type, Interaction.Captcha)
+ self.assertListEqual(self.im.getTasks(self.ADMIN), [t])
+ self.assertEmpty(self.im.getTasks(self.USER))
+ t.setShared()
+ self.assertListEqual(self.im.getTasks(self.USER), [t])
+
+ t2 = self.im.createCaptchaTask("2", "png", "", owner=self.USER)
+ self.assertTrue(self.im.isTaskWaiting(self.USER))
+ self.assertEmpty(self.im.getTasks(self.USER, Interaction.Query))
+ self.im.removeTask(t)
+
+ self.assertListEqual(self.im.getTasks(self.ADMIN), [t2])
+ self.assertIs(self.im.getTaskByID(t2.iid), t2)
+
+
+ def test_query(self):
+ t = self.im.createQueryTask(InputType.Text, "text", owner=self.ADMIN)
+
+ self.assertEqual(t.description, "text")
+ self.assertListEqual(self.im.getTasks(self.ADMIN, Interaction.Query), [t])
+ self.assertEmpty(self.im.getTasks(Interaction.Captcha))
+
+
+ def test_clients(self):
+ self.im.getTasks(self.ADMIN, Interaction.Captcha)
+
+ self.assertTrue(self.im.isClientConnected(self.ADMIN))
+ self.assertFalse(self.im.isClientConnected(self.USER))
+
+
+ def test_users(self):
+ t = self.im.createCaptchaTask("1", "png", "", owner=self.USER)
+ self.assertListEqual(self.im.getTasks(self.ADMIN), [t])