summaryrefslogtreecommitdiffstats
path: root/pyload/database/DatabaseBackend.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/database/DatabaseBackend.py')
-rw-r--r--pyload/database/DatabaseBackend.py500
1 files changed, 500 insertions, 0 deletions
diff --git a/pyload/database/DatabaseBackend.py b/pyload/database/DatabaseBackend.py
new file mode 100644
index 000000000..2244a3026
--- /dev/null
+++ b/pyload/database/DatabaseBackend.py
@@ -0,0 +1,500 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+###############################################################################
+# Copyright(c) 2008-2012 pyLoad Team
+# http://www.pyload.org
+#
+# This file is part of pyLoad.
+# pyLoad is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# Subjected to the terms and conditions in LICENSE
+#
+# @author: RaNaN, mkaay
+###############################################################################
+
+from threading import Thread, Event
+from shutil import move
+
+from Queue import Queue
+from traceback import print_exc
+
+from pyload.utils.fs import chmod, exists, remove
+
+try:
+ from pysqlite2 import dbapi2 as sqlite3
+except:
+ import sqlite3
+
+DB = None
+DB_VERSION = 6
+
+
+def set_DB(db):
+ global DB
+ DB = db
+
+
+def queue(f):
+ @staticmethod
+ def x(*args, **kwargs):
+ if DB:
+ return DB.queue(f, *args, **kwargs)
+
+ return x
+
+
+def async(f):
+ @staticmethod
+ def x(*args, **kwargs):
+ if DB:
+ return DB.async(f, *args, **kwargs)
+
+ return x
+
+
+def inner(f):
+ @staticmethod
+ def x(*args, **kwargs):
+ if DB:
+ return f(DB, *args, **kwargs)
+
+ return x
+
+
+class DatabaseMethods:
+ # stubs for autocompletion
+ core = None
+ manager = None
+ conn = None
+ c = None
+
+ @classmethod
+ def register(cls):
+ DatabaseBackend.registerSub(cls)
+
+
+class DatabaseJob():
+ def __init__(self, f, *args, **kwargs):
+ self.done = Event()
+
+ self.f = f
+ self.args = args
+ self.kwargs = kwargs
+
+ self.result = None
+ self.exception = False
+
+ # import inspect
+ # self.frame = inspect.currentframe()
+
+ def __repr__(self):
+ from os.path import basename
+
+ frame = self.frame.f_back
+ output = ""
+ for i in range(5):
+ output += "\t%s:%s, %s\n" % (basename(frame.f_code.co_filename), frame.f_lineno, frame.f_code.co_name)
+ frame = frame.f_back
+ del frame
+ del self.frame
+
+ return "DataBase Job %s:%s\n%sResult: %s" % (self.f.__name__, self.args[1:], output, self.result)
+
+ def processJob(self):
+ try:
+ self.result = self.f(*self.args, **self.kwargs)
+ except Exception, e:
+ print_exc()
+ try:
+ print "Database Error @", self.f.__name__, self.args[1:], self.kwargs, e
+ except:
+ pass
+
+ self.exception = e
+ finally:
+ self.done.set()
+
+ def wait(self):
+ self.done.wait()
+
+
+class DatabaseBackend(Thread):
+ subs = []
+
+ DB_FILE = "pyload.db"
+ VERSION_FILE = "db.version"
+
+ def __init__(self, core):
+ Thread.__init__(self)
+ self.setDaemon(True)
+ self.core = core
+ self.manager = None # set later
+ self.error = None
+ self.running = Event()
+
+ self.jobs = Queue()
+
+ set_DB(self)
+
+ def setup(self):
+ """ *MUST* be called before db can be used !"""
+ self.start()
+ self.running.wait()
+
+ def init(self):
+ """main loop, which executes commands"""
+
+ version = self._checkVersion()
+
+ self.conn = sqlite3.connect(self.DB_FILE)
+ chmod(self.DB_FILE, 0600)
+
+ self.c = self.conn.cursor()
+
+ if version is not None and version < DB_VERSION:
+ success = self._convertDB(version)
+
+ # delete database
+ if not success:
+ self.c.close()
+ self.conn.close()
+
+ try:
+ self.manager.core.log.warning(_("Database was deleted due to incompatible version."))
+ except:
+ print "Database was deleted due to incompatible version."
+
+ remove(self.VERSION_FILE)
+ move(self.DB_FILE, self.DB_FILE + ".backup")
+ f = open(self.VERSION_FILE, "wb")
+ f.write(str(DB_VERSION))
+ f.close()
+
+ self.conn = sqlite3.connect(self.DB_FILE)
+ chmod(self.DB_FILE, 0600)
+ self.c = self.conn.cursor()
+
+ self._createTables()
+ self.conn.commit()
+
+
+ def run(self):
+ try:
+ self.init()
+ except Exception, e:
+ self.error = e
+ finally:
+ self.running.set()
+
+ while True:
+ j = self.jobs.get()
+ if j == "quit":
+ self.c.close()
+ self.conn.commit()
+ self.conn.close()
+ self.closing.set()
+ break
+ j.processJob()
+
+
+ def shutdown(self):
+ self.running.clear()
+ self.closing = Event()
+ self.jobs.put("quit")
+ self.closing.wait(1)
+
+ def _checkVersion(self):
+ """ get db version"""
+ if not exists(self.VERSION_FILE):
+ f = open(self.VERSION_FILE, "wb")
+ f.write(str(DB_VERSION))
+ f.close()
+ return
+
+ f = open(self.VERSION_FILE, "rb")
+ v = int(f.read().strip())
+ f.close()
+
+ return v
+
+ def _convertDB(self, v):
+ try:
+ return getattr(self, "_convertV%i" % v)()
+ except:
+ return False
+
+ #--convert scripts start
+
+ def _convertV6(self):
+ return False
+
+ #--convert scripts end
+
+ def _createTables(self):
+ """create tables for database"""
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "packages" ('
+ '"pid" INTEGER PRIMARY KEY AUTOINCREMENT, '
+ '"name" TEXT NOT NULL, '
+ '"folder" TEXT DEFAULT "" NOT NULL, '
+ '"site" TEXT DEFAULT "" NOT NULL, '
+ '"comment" TEXT DEFAULT "" NOT NULL, '
+ '"password" TEXT DEFAULT "" NOT NULL, '
+ '"added" INTEGER DEFAULT 0 NOT NULL,' # set by trigger
+ '"status" INTEGER DEFAULT 0 NOT NULL,'
+ '"tags" TEXT DEFAULT "" NOT NULL,'
+ '"shared" INTEGER DEFAULT 0 NOT NULL,'
+ '"packageorder" INTEGER DEFAULT -1 NOT NULL,' #incremented by trigger
+ '"root" INTEGER DEFAULT -1 NOT NULL, '
+ '"owner" INTEGER NOT NULL, '
+ 'FOREIGN KEY(owner) REFERENCES users(uid), '
+ 'CHECK (root != pid)'
+ ')'
+ )
+
+ self.c.execute(
+ 'CREATE TRIGGER IF NOT EXISTS "insert_package" AFTER INSERT ON "packages"'
+ 'BEGIN '
+ 'UPDATE packages SET added = strftime("%s", "now"), '
+ 'packageorder = (SELECT max(p.packageorder) + 1 FROM packages p WHERE p.root=new.root) '
+ 'WHERE rowid = new.rowid;'
+ 'END'
+ )
+
+ self.c.execute(
+ 'CREATE TRIGGER IF NOT EXISTS "delete_package" AFTER DELETE ON "packages"'
+ 'BEGIN '
+ 'DELETE FROM files WHERE package = old.pid;'
+ 'UPDATE packages SET packageorder=packageorder-1 WHERE packageorder > old.packageorder AND root=old.pid;'
+ 'END'
+ )
+ self.c.execute('CREATE INDEX IF NOT EXISTS "package_index" ON packages(root, owner)')
+ self.c.execute('CREATE INDEX IF NOT EXISTS "package_owner" ON packages(owner)')
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "files" ('
+ '"fid" INTEGER PRIMARY KEY AUTOINCREMENT, '
+ '"name" TEXT NOT NULL, '
+ '"size" INTEGER DEFAULT 0 NOT NULL, '
+ '"status" INTEGER DEFAULT 0 NOT NULL, '
+ '"media" INTEGER DEFAULT 1 NOT NULL,'
+ '"added" INTEGER DEFAULT 0 NOT NULL,'
+ '"fileorder" INTEGER DEFAULT -1 NOT NULL, '
+ '"url" TEXT DEFAULT "" NOT NULL, '
+ '"plugin" TEXT DEFAULT "" NOT NULL, '
+ '"hash" TEXT DEFAULT "" NOT NULL, '
+ '"dlstatus" INTEGER DEFAULT 0 NOT NULL, '
+ '"error" TEXT DEFAULT "" NOT NULL, '
+ '"package" INTEGER NOT NULL, '
+ '"owner" INTEGER NOT NULL, '
+ 'FOREIGN KEY(owner) REFERENCES users(uid), '
+ 'FOREIGN KEY(package) REFERENCES packages(id)'
+ ')'
+ )
+ self.c.execute('CREATE INDEX IF NOT EXISTS "file_index" ON files(package, owner)')
+ self.c.execute('CREATE INDEX IF NOT EXISTS "file_owner" ON files(owner)')
+
+ self.c.execute(
+ 'CREATE TRIGGER IF NOT EXISTS "insert_file" AFTER INSERT ON "files"'
+ 'BEGIN '
+ 'UPDATE files SET added = strftime("%s", "now"), '
+ 'fileorder = (SELECT max(f.fileorder) + 1 FROM files f WHERE f.package=new.package) '
+ 'WHERE rowid = new.rowid;'
+ 'END'
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "collector" ('
+ '"owner" INTEGER NOT NULL, '
+ '"data" TEXT NOT NULL, '
+ 'FOREIGN KEY(owner) REFERENCES users(uid), '
+ 'PRIMARY KEY(owner) ON CONFLICT REPLACE'
+ ') '
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "storage" ('
+ '"identifier" TEXT NOT NULL, '
+ '"key" TEXT NOT NULL, '
+ '"value" TEXT DEFAULT "", '
+ 'PRIMARY KEY (identifier, key) ON CONFLICT REPLACE'
+ ')'
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "users" ('
+ '"uid" INTEGER PRIMARY KEY AUTOINCREMENT, '
+ '"name" TEXT NOT NULL UNIQUE, '
+ '"email" TEXT DEFAULT "" NOT NULL, '
+ '"password" TEXT NOT NULL, '
+ '"role" INTEGER DEFAULT 0 NOT NULL, '
+ '"permission" INTEGER DEFAULT 0 NOT NULL, '
+ '"folder" TEXT DEFAULT "" NOT NULL, '
+ '"traffic" INTEGER DEFAULT -1 NOT NULL, '
+ '"dllimit" INTEGER DEFAULT -1 NOT NULL, '
+ '"dlquota" TEXT DEFAULT "" NOT NULL, '
+ '"hddquota" INTEGER DEFAULT -1 NOT NULL, '
+ '"template" TEXT DEFAULT "default" NOT NULL, '
+ '"user" INTEGER DEFAULT -1 NOT NULL, ' # set by trigger to self
+ 'FOREIGN KEY(user) REFERENCES users(uid)'
+ ')'
+ )
+ self.c.execute('CREATE INDEX IF NOT EXISTS "username_index" ON users(name)')
+
+ self.c.execute(
+ 'CREATE TRIGGER IF NOT EXISTS "insert_user" AFTER INSERT ON "users"'
+ 'BEGIN '
+ 'UPDATE users SET user = new.uid, folder=new.name '
+ 'WHERE rowid = new.rowid;'
+ 'END'
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "settings" ('
+ '"plugin" TEXT NOT NULL, '
+ '"user" INTEGER DEFAULT -1 NOT NULL, '
+ '"config" TEXT NOT NULL, '
+ 'FOREIGN KEY(user) REFERENCES users(uid), '
+ 'PRIMARY KEY (plugin, user) ON CONFLICT REPLACE'
+ ')'
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "accounts" ('
+ '"plugin" TEXT NOT NULL, '
+ '"loginname" TEXT NOT NULL, '
+ '"owner" INTEGER NOT NULL DEFAULT -1, '
+ '"activated" INTEGER DEFAULT 1, '
+ '"password" TEXT DEFAULT "", '
+ '"shared" INTEGER DEFAULT 0, '
+ '"options" TEXT DEFAULT "", '
+ 'FOREIGN KEY(owner) REFERENCES users(uid), '
+ 'PRIMARY KEY (plugin, loginname, owner) ON CONFLICT REPLACE'
+ ')'
+ )
+
+ self.c.execute(
+ 'CREATE TABLE IF NOT EXISTS "stats" ('
+ '"user" INTEGER NOT NULL, '
+ '"plugin" TEXT NOT NULL, '
+ '"time" INTEGER NOT NULL, '
+ '"premium" INTEGER DEFAULT 0 NOT NULL, '
+ '"amount" INTEGER DEFAULT 0 NOT NULL, '
+ 'FOREIGN KEY(user) REFERENCES users(uid), '
+ 'PRIMARY KEY(user, plugin, time)'
+ ')'
+ )
+ self.c.execute('CREATE INDEX IF NOT EXISTS "stats_time" ON stats(time)')
+
+ #try to lower ids
+ self.c.execute('SELECT max(fid) FROM files')
+ fid = self.c.fetchone()[0]
+ fid = int(fid) if fid else 0
+ self.c.execute('UPDATE SQLITE_SEQUENCE SET seq=? WHERE name=?', (fid, "files"))
+
+ self.c.execute('SELECT max(pid) FROM packages')
+ pid = self.c.fetchone()[0]
+ pid = int(pid) if pid else 0
+ self.c.execute('UPDATE SQLITE_SEQUENCE SET seq=? WHERE name=?', (pid, "packages"))
+
+ self.c.execute('VACUUM')
+
+
+ def createCursor(self):
+ return self.conn.cursor()
+
+ @async
+ def commit(self):
+ self.conn.commit()
+
+ @queue
+ def syncSave(self):
+ self.conn.commit()
+
+ @async
+ def rollback(self):
+ self.conn.rollback()
+
+ def async(self, f, *args, **kwargs):
+ args = (self, ) + args
+ job = DatabaseJob(f, *args, **kwargs)
+ self.jobs.put(job)
+
+ def queue(self, f, *args, **kwargs):
+ # Raise previous error of initialization
+ if self.error: raise self.error
+ args = (self, ) + args
+ job = DatabaseJob(f, *args, **kwargs)
+ self.jobs.put(job)
+
+ # only wait when db is running
+ if self.running.isSet(): job.wait()
+ return job.result
+
+ @classmethod
+ def registerSub(cls, klass):
+ cls.subs.append(klass)
+
+ @classmethod
+ def unregisterSub(cls, klass):
+ cls.subs.remove(klass)
+
+ def __getattr__(self, attr):
+ for sub in DatabaseBackend.subs:
+ if hasattr(sub, attr):
+ return getattr(sub, attr)
+ raise AttributeError(attr)
+
+
+if __name__ == "__main__":
+ db = DatabaseBackend()
+ db.setup()
+
+ class Test():
+ @queue
+ def insert(db):
+ c = db.createCursor()
+ for i in range(1000):
+ c.execute("INSERT INTO storage (identifier, key, value) VALUES (?, ?, ?)", ("foo", i, "bar"))
+
+ @async
+ def insert2(db):
+ c = db.createCursor()
+ for i in range(1000 * 1000):
+ c.execute("INSERT INTO storage (identifier, key, value) VALUES (?, ?, ?)", ("foo", i, "bar"))
+
+ @queue
+ def select(db):
+ c = db.createCursor()
+ for i in range(10):
+ res = c.execute("SELECT value FROM storage WHERE identifier=? AND key=?", ("foo", i))
+ print res.fetchone()
+
+ @queue
+ def error(db):
+ c = db.createCursor()
+ print "a"
+ c.execute("SELECT myerror FROM storage WHERE identifier=? AND key=?", ("foo", i))
+ print "e"
+
+ db.registerSub(Test)
+ from time import time
+
+ start = time()
+ for i in range(100):
+ db.insert()
+ end = time()
+ print end - start
+
+ start = time()
+ db.insert2()
+ end = time()
+ print end - start
+
+ db.error()
+