diff options
Diffstat (limited to 'pyload/database/DatabaseBackend.py')
| -rw-r--r-- | pyload/database/DatabaseBackend.py | 492 | 
1 files changed, 492 insertions, 0 deletions
| diff --git a/pyload/database/DatabaseBackend.py b/pyload/database/DatabaseBackend.py new file mode 100644 index 000000000..99a406d9c --- /dev/null +++ b/pyload/database/DatabaseBackend.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################################################### +#   Copyright(c) 2008-2012 pyLoad Team +#   http://www.pyload.org +# +#   This file is part of pyLoad. +#   pyLoad is free software: you can redistribute it and/or modify +#   it under the terms of the GNU Affero General Public License as +#   published by the Free Software Foundation, either version 3 of the +#   License, or (at your option) any later version. +# +#   Subjected to the terms and conditions in LICENSE +# +#   @author: RaNaN, mkaay +############################################################################### + +from threading import Thread, Event +from shutil import move + +from Queue import Queue +from traceback import print_exc + +from pyload.utils.fs import chmod, exists, remove + +try: +    from pysqlite2 import dbapi2 as sqlite3 +except: +    import sqlite3 + +DB = None +DB_VERSION = 6 + +def set_DB(db): +    global DB +    DB = db + + +def queue(f): +    @staticmethod +    def x(*args, **kwargs): +        if DB: +            return DB.queue(f, *args, **kwargs) + +    return x + + +def async(f): +    @staticmethod +    def x(*args, **kwargs): +        if DB: +            return DB.async(f, *args, **kwargs) + +    return x + + +def inner(f): +    @staticmethod +    def x(*args, **kwargs): +        if DB: +            return f(DB, *args, **kwargs) + +    return x + + +class DatabaseMethods: +    # stubs for autocompletion +    core = None +    manager = None +    conn = None +    c = None + +    @classmethod +    def register(cls): +        DatabaseBackend.registerSub(cls) + + +class DatabaseJob(): +    def __init__(self, f, *args, **kwargs): +        self.done = Event() + +        self.f = f +        self.args = args +        self.kwargs = kwargs + +        self.result = None +        self.exception = False + +    #        import inspect +    #        self.frame = inspect.currentframe() + +    def __repr__(self): +        from os.path import basename + +        frame = self.frame.f_back +        output = "" +        for i in range(5): +            output += "\t%s:%s, %s\n" % (basename(frame.f_code.co_filename), frame.f_lineno, frame.f_code.co_name) +            frame = frame.f_back +        del frame +        del self.frame + +        return "DataBase Job %s:%s\n%sResult: %s" % (self.f.__name__, self.args[1:], output, self.result) + +    def processJob(self): +        try: +            self.result = self.f(*self.args, **self.kwargs) +        except Exception, e: +            print_exc() +            try: +                print "Database Error @", self.f.__name__, self.args[1:], self.kwargs, e +            except: +                pass + +            self.exception = e +        finally: +            self.done.set() + +    def wait(self): +        self.done.wait() + + +class DatabaseBackend(Thread): +    subs = [] + +    DB_FILE = "pyload.db" +    VERSION_FILE = "db.version" + +    def __init__(self, core): +        Thread.__init__(self) +        self.setDaemon(True) +        self.core = core +        self.manager = None # set later +        self.running = Event() + +        self.jobs = Queue() + +        set_DB(self) + +    def setup(self): +        """ *MUST* be called before db can be used !""" +        self.start() +        self.running.wait() + +    def init(self): +        """main loop, which executes commands""" + +        version = self._checkVersion() + +        self.conn = sqlite3.connect(self.DB_FILE) +        chmod(self.DB_FILE, 0600) + +        self.c = self.conn.cursor() + +        if version is not None and version < DB_VERSION: +            success = self._convertDB(version) + +            # delete database +            if not success: +                self.c.close() +                self.conn.close() + +                try: +                    self.manager.core.log.warning(_("Database was deleted due to incompatible version.")) +                except: +                    print "Database was deleted due to incompatible version." + +                remove(self.VERSION_FILE) +                move(self.DB_FILE, self.DB_FILE + ".backup") +                f = open(self.VERSION_FILE, "wb") +                f.write(str(DB_VERSION)) +                f.close() + +                self.conn = sqlite3.connect(self.DB_FILE) +                chmod(self.DB_FILE, 0600) +                self.c = self.conn.cursor() + +        self._createTables() +        self.conn.commit() + + +    def run(self): +        try: +            self.init() +        finally: +            self.running.set() + +        while True: +            j = self.jobs.get() +            if j == "quit": +                self.c.close() +                self.conn.commit() +                self.conn.close() +                self.closing.set() +                break +            j.processJob() + + +    def shutdown(self): +        self.running.clear() +        self.closing = Event() +        self.jobs.put("quit") +        self.closing.wait(1) + +    def _checkVersion(self): +        """ get db version""" +        if not exists(self.VERSION_FILE): +            f = open(self.VERSION_FILE, "wb") +            f.write(str(DB_VERSION)) +            f.close() +            return + +        f = open(self.VERSION_FILE, "rb") +        v = int(f.read().strip()) +        f.close() + +        return v + +    def _convertDB(self, v): +        try: +            return getattr(self, "_convertV%i" % v)() +        except: +            return False + +    #--convert scripts start + +    def _convertV6(self): +        return False + +    #--convert scripts end + +    def _createTables(self): +        """create tables for database""" + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "packages" (' +            '"pid" INTEGER PRIMARY KEY AUTOINCREMENT, ' +            '"name" TEXT NOT NULL, ' +            '"folder" TEXT DEFAULT "" NOT NULL, ' +            '"site" TEXT DEFAULT "" NOT NULL, ' +            '"comment" TEXT DEFAULT "" NOT NULL, ' +            '"password" TEXT DEFAULT "" NOT NULL, ' +            '"added" INTEGER DEFAULT 0 NOT NULL,' # set by trigger +            '"status" INTEGER DEFAULT 0 NOT NULL,' +            '"tags" TEXT DEFAULT "" NOT NULL,' +            '"shared" INTEGER DEFAULT 0 NOT NULL,' +            '"packageorder" INTEGER DEFAULT -1 NOT NULL,' #incremented by trigger +            '"root" INTEGER DEFAULT -1 NOT NULL, ' +            '"owner" INTEGER NOT NULL, ' +            'FOREIGN KEY(owner) REFERENCES users(uid), ' +            'CHECK (root != pid)' +            ')' +        ) + +        self.c.execute( +            'CREATE TRIGGER IF NOT EXISTS "insert_package" AFTER INSERT ON "packages"' +            'BEGIN ' +            'UPDATE packages SET added = strftime("%s", "now"), ' +            'packageorder = (SELECT max(p.packageorder) + 1 FROM packages p WHERE p.root=new.root) ' +            'WHERE rowid = new.rowid;' +            'END' +        ) + +        self.c.execute( +            'CREATE TRIGGER IF NOT EXISTS "delete_package" AFTER DELETE ON "packages"' +            'BEGIN ' +            'DELETE FROM files WHERE package = old.pid;' +            'UPDATE packages SET packageorder=packageorder-1 WHERE packageorder > old.packageorder AND root=old.pid;' +            'END' +        ) +        self.c.execute('CREATE INDEX IF NOT EXISTS "package_index" ON packages(root, owner)') +        self.c.execute('CREATE INDEX IF NOT EXISTS "package_owner" ON packages(owner)') + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "files" (' +            '"fid" INTEGER PRIMARY KEY AUTOINCREMENT, ' +            '"name" TEXT NOT NULL, ' +            '"size" INTEGER DEFAULT 0 NOT NULL, ' +            '"status" INTEGER DEFAULT 0 NOT NULL, ' +            '"media" INTEGER DEFAULT 1 NOT NULL,' +            '"added" INTEGER DEFAULT 0 NOT NULL,' +            '"fileorder" INTEGER DEFAULT -1 NOT NULL, ' +            '"url" TEXT DEFAULT "" NOT NULL, ' +            '"plugin" TEXT DEFAULT "" NOT NULL, ' +            '"hash" TEXT DEFAULT "" NOT NULL, ' +            '"dlstatus" INTEGER DEFAULT 0 NOT NULL, ' +            '"error" TEXT DEFAULT "" NOT NULL, ' +            '"package" INTEGER NOT NULL, ' +            '"owner" INTEGER NOT NULL, ' +            'FOREIGN KEY(owner) REFERENCES users(uid), ' +            'FOREIGN KEY(package) REFERENCES packages(id)' +            ')' +        ) +        self.c.execute('CREATE INDEX IF NOT EXISTS "file_index" ON files(package, owner)') +        self.c.execute('CREATE INDEX IF NOT EXISTS "file_owner" ON files(owner)') + +        self.c.execute( +            'CREATE TRIGGER IF NOT EXISTS "insert_file" AFTER INSERT ON "files"' +            'BEGIN ' +            'UPDATE files SET added = strftime("%s", "now"), ' +            'fileorder = (SELECT max(f.fileorder) + 1 FROM files f WHERE f.package=new.package) ' +            'WHERE rowid = new.rowid;' +            'END' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "collector" (' +            '"owner" INTEGER NOT NULL, ' +            '"data" TEXT NOT NULL, ' +            'FOREIGN KEY(owner) REFERENCES users(uid), ' +            'PRIMARY KEY(owner) ON CONFLICT REPLACE' +            ') ' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "storage" (' +            '"identifier" TEXT NOT NULL, ' +            '"key" TEXT NOT NULL, ' +            '"value" TEXT DEFAULT "", ' +            'PRIMARY KEY (identifier, key) ON CONFLICT REPLACE' +            ')' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "users" (' +            '"uid" INTEGER PRIMARY KEY AUTOINCREMENT, ' +            '"name" TEXT NOT NULL UNIQUE, ' +            '"email" TEXT DEFAULT "" NOT NULL, ' +            '"password" TEXT NOT NULL, ' +            '"role" INTEGER DEFAULT 0 NOT NULL, ' +            '"permission" INTEGER DEFAULT 0 NOT NULL, ' +            '"folder" TEXT DEFAULT "" NOT NULL, ' +            '"traffic" INTEGER DEFAULT -1 NOT NULL, ' +            '"dllimit" INTEGER DEFAULT -1 NOT NULL, ' +            '"dlquota" TEXT DEFAULT "" NOT NULL, ' +            '"hddquota" INTEGER DEFAULT -1 NOT NULL, ' +            '"template" TEXT DEFAULT "default" NOT NULL, ' +            '"user" INTEGER DEFAULT -1 NOT NULL, ' # set by trigger to self +            'FOREIGN KEY(user) REFERENCES users(uid)' +            ')' +        ) +        self.c.execute('CREATE INDEX IF NOT EXISTS "username_index" ON users(name)') + +        self.c.execute( +            'CREATE TRIGGER IF NOT EXISTS "insert_user" AFTER INSERT ON "users"' +            'BEGIN ' +            'UPDATE users SET user = new.uid, folder=new.name ' +            'WHERE rowid = new.rowid;' +            'END' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "settings" (' +            '"plugin" TEXT NOT NULL, ' +            '"user" INTEGER DEFAULT -1 NOT NULL, ' +            '"config" TEXT NOT NULL, ' +            'FOREIGN KEY(user) REFERENCES users(uid), ' +            'PRIMARY KEY (plugin, user) ON CONFLICT REPLACE' +            ')' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "accounts" (' +            '"plugin" TEXT NOT NULL, ' +            '"loginname" TEXT NOT NULL, ' +            '"owner" INTEGER NOT NULL DEFAULT -1, ' +            '"activated" INTEGER DEFAULT 1, ' +            '"password" TEXT DEFAULT "", ' +            '"shared" INTEGER DEFAULT 0, ' +            '"options" TEXT DEFAULT "", ' +            'FOREIGN KEY(owner) REFERENCES users(uid), ' +            'PRIMARY KEY (plugin, loginname, owner) ON CONFLICT REPLACE' +            ')' +        ) + +        self.c.execute( +            'CREATE TABLE IF NOT EXISTS "stats" (' +            '"user" INTEGER NOT NULL, ' +            '"plugin" TEXT NOT NULL, ' +            '"time" INTEGER NOT NULL, ' +            '"premium" INTEGER DEFAULT 0 NOT NULL, ' +            '"amount" INTEGER DEFAULT 0 NOT NULL, ' +            'FOREIGN KEY(user) REFERENCES users(uid), ' +            'PRIMARY KEY(user, plugin, time)' +            ')' +        ) +        self.c.execute('CREATE INDEX IF NOT EXISTS "stats_time" ON stats(time)') + +        #try to lower ids +        self.c.execute('SELECT max(fid) FROM files') +        fid = self.c.fetchone()[0] +        fid = int(fid) if fid else 0 +        self.c.execute('UPDATE SQLITE_SEQUENCE SET seq=? WHERE name=?', (fid, "files")) + +        self.c.execute('SELECT max(pid) FROM packages') +        pid = self.c.fetchone()[0] +        pid = int(pid) if pid else 0 +        self.c.execute('UPDATE SQLITE_SEQUENCE SET seq=? WHERE name=?', (pid, "packages")) + +        self.c.execute('VACUUM') + + +    def createCursor(self): +        return self.conn.cursor() + +    @async +    def commit(self): +        self.conn.commit() + +    @queue +    def syncSave(self): +        self.conn.commit() + +    @async +    def rollback(self): +        self.conn.rollback() + +    def async(self, f, *args, **kwargs): +        args = (self, ) + args +        job = DatabaseJob(f, *args, **kwargs) +        self.jobs.put(job) + +    def queue(self, f, *args, **kwargs): +        args = (self, ) + args +        job = DatabaseJob(f, *args, **kwargs) +        self.jobs.put(job) +        # only wait when db is running +        if self.running.isSet(): job.wait() +        return job.result + +    @classmethod +    def registerSub(cls, klass): +        cls.subs.append(klass) + +    @classmethod +    def unregisterSub(cls, klass): +        cls.subs.remove(klass) + +    def __getattr__(self, attr): +        for sub in DatabaseBackend.subs: +            if hasattr(sub, attr): +                return getattr(sub, attr) +        raise AttributeError(attr) + +if __name__ == "__main__": +    db = DatabaseBackend() +    db.setup() + +    class Test(): +        @queue +        def insert(db): +            c = db.createCursor() +            for i in range(1000): +                c.execute("INSERT INTO storage (identifier, key, value) VALUES (?, ?, ?)", ("foo", i, "bar")) + +        @async +        def insert2(db): +            c = db.createCursor() +            for i in range(1000 * 1000): +                c.execute("INSERT INTO storage (identifier, key, value) VALUES (?, ?, ?)", ("foo", i, "bar")) + +        @queue +        def select(db): +            c = db.createCursor() +            for i in range(10): +                res = c.execute("SELECT value FROM storage WHERE identifier=? AND key=?", ("foo", i)) +                print res.fetchone() + +        @queue +        def error(db): +            c = db.createCursor() +            print "a" +            c.execute("SELECT myerror FROM storage WHERE identifier=? AND key=?", ("foo", i)) +            print "e" + +    db.registerSub(Test) +    from time import time + +    start = time() +    for i in range(100): +        db.insert() +    end = time() +    print end - start + +    start = time() +    db.insert2() +    end = time() +    print end - start + +    db.error() + | 
