summaryrefslogtreecommitdiffstats
path: root/module/FileDatabase.py
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-07-29 12:09:42 +0200
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-07-29 12:09:42 +0200
commit2ba07aa53d2af572af2c5a43e77725abd46e1b13 (patch)
tree89ff915a3a476dd9431d38c6e2b39b9de1aeb0f5 /module/FileDatabase.py
parentAdded tag working for changeset 3cca18acfe7d (diff)
downloadpyload-2ba07aa53d2af572af2c5a43e77725abd46e1b13.tar.xz
many new stuff, some things already working
Diffstat (limited to 'module/FileDatabase.py')
-rw-r--r--module/FileDatabase.py684
1 files changed, 684 insertions, 0 deletions
diff --git a/module/FileDatabase.py b/module/FileDatabase.py
new file mode 100644
index 000000000..843121492
--- /dev/null
+++ b/module/FileDatabase.py
@@ -0,0 +1,684 @@
+#!/usr/bin/env python
+"""
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+
+ @author: RaNaN
+"""
+from Queue import Queue
+import sqlite3
+from threading import Thread
+from threading import RLock
+from time import sleep
+from time import time
+from os import path
+import traceback
+
+statusMap = {
+ "finished": 0,
+ "offline": 1,
+ "online": 2,
+ "queued": 3,
+ "checking": 4,
+ "waiting": 5,
+ "reconnected": 6,
+ "starting": 7,
+ "failed": 8,
+ "aborted": 9,
+ "decrypting": 10,
+ "custom": 11,
+ "downloading": 12,
+ "processing": 13
+}
+
+########################################################################
+class FileHandler:
+ """Handles all request made to obtain information,
+ modify status or other request for links or packages"""
+
+
+ #----------------------------------------------------------------------
+ def __init__(self, core):
+ """Constructor"""
+ self.core = core
+
+ # translations
+ self.statusMsg = [_("finished"), _("offline"), _("online"), _("queued"), _("checking"), _("waiting"), _("reconnected"), _("starting"),_("failed"), _("aborted"), _("decrypting"), _("custom"),_("downloading"), _("processing")]
+
+ self.cache = {} #holds instances for files
+ self.packageCache = {} # same for packages
+ #@TODO: purge the cache
+
+ self.jobCache = {}
+
+ self.lock = RLock()
+
+ self.filecount = -1 # if an invalid value is set get current value from db
+ self.unchanged = False #determines if any changes was made since last call
+
+ self.db = FileDatabaseBackend(self) # the backend
+
+
+ def change(func):
+ def new(*args):
+ args[0].unchanged = False
+ args[0].filecount = -1
+ args[0].jobCache = {}
+ return func(*args)
+ return new
+
+ #----------------------------------------------------------------------
+ def save(self):
+ """saves all data to backend"""
+ self.db.commit()
+
+ #----------------------------------------------------------------------
+ def getCompleteData(self, queue=1):
+ """gets a complete data representation"""
+
+ data = self.db.getAllLinks(queue)
+ packs = self.db.getAllPackages(queue)
+
+ print data
+ print packs
+
+ data.update( [ (x.id, x.toDbDict()[x.id]) for x in self.cache.itervalues() ] )
+ packs.update( [ (x.id, x.toDict()[x.id]) for x in self.packageCache.itervalues() if x.queue == queue] )
+
+ for key, value in data.iteritems():
+ if packs.has_key(value["package"]):
+ packs[value["package"]]["links"][key] = value
+
+ return packs
+
+ #----------------------------------------------------------------------
+ @change
+ def addLinks(self, urls, package):
+ """adds links"""
+
+ # tuple of (url, name, plugin, package)
+ links = [ (x[0], x[0], x[1], package) for x in self.core.pluginManager.parseUrls(urls) ]
+
+ self.db.addLinks(links, package)
+
+
+ #----------------------------------------------------------------------
+ @change
+ def addPackage(self, name, folder, queue=0):
+ """adds a package, default to link collector"""
+ return self.db.addPackage(name, folder, queue)
+
+ #----------------------------------------------------------------------
+ @change
+ def deletePackage(self, id):
+ """delete package and all contained links"""
+
+ self.lock.acquire()
+
+ if self.packageCache.has_key(id):
+ del self.packageCache[id]
+
+ toDelete = []
+
+ for pyfile in self.cache.itervalues():
+ if pyfile.packageid == id:
+ pyfile.abort()
+ toDelete.append(pyfile.id)
+
+ for pid in toDelete:
+ del self.cache[pid]
+
+ self.db.deletePackage(id)
+
+
+ self.lock.release()
+
+ #----------------------------------------------------------------------
+ @change
+ def deleteLink(self, id):
+ """deletes links"""
+
+ self.lock.acquire()
+
+ if self.cache.has_key(id):
+ self.cache[id].abort()
+ del self.cache[id]
+
+ self.lock.release()
+
+ self.db.deleteLink(id)
+
+ #----------------------------------------------------------------------
+ def releaseLink(self, id):
+ """removes pyfile from cache"""
+ if self.cache.has_key(id):
+ del self.cache[id]
+
+ #----------------------------------------------------------------------
+ def releasePackage(self, id):
+ """removes package from cache"""
+ if self.packageCache.has_key(id):
+ del self.packageCache[id]
+
+ #----------------------------------------------------------------------
+ def updateLink(self, pyfile):
+ """updates link"""
+ self.db.updateLink(pyfile)
+
+ #----------------------------------------------------------------------
+ def updatePackage(self, pypack):
+ """updates a package"""
+ self.db.updatePackage(pypack)
+
+ #----------------------------------------------------------------------
+ def getPackage(self, id):
+ """return package instance"""
+
+ if self.packageCache.has_key(id):
+ return self.packageCache[id]
+ else:
+ return self.db.getPackage(id)
+
+ #----------------------------------------------------------------------
+ def getFile(self, id):
+ """returns pyfile instance"""
+ if self.cache.has_key(id):
+ return self.cache[id]
+ else:
+ return self.db.getFile(id)
+
+ #----------------------------------------------------------------------
+ def getJob(self, occ):
+ """get suitable job"""
+
+ self.lock.acquire()
+
+ if self.jobCache.has_key(occ):
+ pass
+ else:
+ self.jobCache = {} #better not caching to much
+ jobs = self.db.getJob(occ)
+ jobs.reverse()
+ self.jobCache[occ] = jobs
+
+ #@TODO: maybe the new job has to be approved...
+
+ if not self.jobCache[occ]:
+ pyfile = None
+ else:
+ pyfile = self.getFile(self.jobCache[occ].pop())
+
+ self.lock.release()
+ return pyfile
+
+
+ #----------------------------------------------------------------------
+ def getFileCount(self):
+ """returns number of files"""
+
+ if self.filecount == -1:
+ self.filecount = self.db.filecount(1)
+
+ return self.filecount
+
+
+
+#########################################################################
+class FileDatabaseBackend(Thread):
+ """underlying backend for the filehandler to save the data"""
+
+ def __init__(self, manager):
+ Thread.__init__(self)
+
+ self.setDaemon(True)
+
+ self.manager = manager
+
+ self.jobs = Queue() # queues for jobs
+ self.res = Queue()
+
+ self.start()
+
+
+ def queue(func):
+ """use as decorator when fuction directly executes sql commands"""
+ def new(*args):
+ args[0].jobs.put((func, args, 0))
+ return args[0].res.get()
+ return new
+
+ def async(func):
+ """use as decorator when function does not return anything and asynchron execution is wanted"""
+ def new(*args):
+ args[0].jobs.put((func, args, 1))
+ return True
+ return new
+
+ def run(self):
+ """main loop, which executes commands"""
+
+ self.conn = sqlite3.connect("files.db")
+ self.c = self.conn.cursor()
+ #self.c.execute("PRAGMA synchronous = OFF")
+ self._createTables()
+
+ while True:
+ try:
+ f, args, async = self.jobs.get()
+ if f == "quit": return True
+ res = f(*args)
+ if not async: self.res.put(res)
+ except Exception, e:
+ #@TODO log etc
+ print "Database Error @", f.__name__, args[1:], e
+ traceback.print_exc()
+ if not async: self.res.put(None)
+
+ def shutdown(self):
+ self.save()
+ self.jobs.put(("quit", "", 0))
+
+ def _createTables(self):
+ """create tables for database"""
+
+ self.c.execute('CREATE TABLE IF NOT EXISTS "packages" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" TEXT NOT NULL, "folder" TEXT, "password" TEXT, "site" TEXT, "queue" INTEGER DEFAULT 0 NOT NULL)')
+ self.c.execute('CREATE TABLE IF NOT EXISTS "links" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "url" TEXT NOT NULL, "name" TEXT, "size" INTEGER DEFAULT 0 NOT NULL, "status" INTEGER DEFAULT 3 NOT NULL, "plugin" TEXT DEFAULT "BasePlugin" NOT NULL, "error" TEXT, "package" INTEGER DEFAULT 0 NOT NULL, FOREIGN KEY(package) REFERENCES packages(id))')
+ self.c.execute('CREATE INDEX IF NOT EXISTS "pIdIndex" ON links(package)')
+ self.c.execute('VACUUM')
+
+ #----------------------------------------------------------------------
+ @queue
+ def filecount(self, queue):
+ """returns number of files in queue"""
+ self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id", (queue,))
+ r = self.c.fetchall()
+ return len(r)
+
+ @queue
+ def addLink(self, url, name, plugin, package):
+ self.c.execute('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', (url, name, plugin, package))
+ return self.c.lastrowid
+
+ @queue
+ def addLinks(self, links, package):
+ """ links is a list of tupels (url,name,plugin)"""
+ self.c.executemany('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', links)
+
+ @queue
+ def addPackage(self, name, folder, queue):
+
+ self.c.execute('INSERT INTO packages(name, folder, queue) VALUES(?,?,?)', (name, folder, queue))
+ return self.c.lastrowid
+
+ @queue
+ def deletePackage(self, id):
+
+ self.c.execute('DELETE FROM links WHERE package=?', (str(id), ))
+ self.c.execute('DELETE FROM packages WHERE id=?', (str(id), ))
+
+ @queue
+ def deleteLink(self, id):
+
+ self.c.execute('DELETE FROM links WHERE id=?', (str(id), ))
+
+
+ @queue
+ def getAllLinks(self, q):
+ """return information about all links in queue q
+
+ q0 queue
+ q1 collector
+
+ format:
+
+ {
+ id: {'name': name, ... 'package': id }, ...
+ }
+
+ """
+ self.c.execute('SELECT l.id,l.url,l.name,l.size,l.status,l.error,l.plugin,l.package FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id', (q, ))
+ data = {}
+ for r in self.c:
+ data[int(r[0])] = {
+ 'url': r[1],
+ 'name': r[2],
+ 'size': r[3],
+ 'status': r[4],
+ 'statusmsg': self.manager.statusMsg[r[4]],
+ 'error': r[5],
+ 'plugin': r[6],
+ 'package': r[7]
+ }
+
+ return data
+
+ @queue
+ def getAllPackages(self, q):
+ """return information about packages in queue q
+ (only useful in get all data)
+
+ q0 queue
+ q1 collector
+
+ format:
+
+ {
+ id: {'name': name ... 'links': {} }, ...
+ }
+ """
+ self.c.execute('SELECT id,name,folder,site,password,queue FROM packages WHERE queue=? ORDER BY id', str(q))
+
+ data = {}
+ for r in self.c:
+ data[int(r[0])] = {
+ 'name': r[1],
+ 'folder': r[2],
+ 'site': r[3],
+ 'password': r[4],
+ 'queue': r[5],
+ 'links': {}
+ }
+
+ return data
+
+
+ def getLinkData(self, id):
+ """get link information"""
+ pass
+
+ def getPackageData(self, id):
+ """get package data _with_ link data"""
+ pass
+
+
+ @async
+ def updateLink(self, f):
+ self.c.execute('UPDATE links SET url=?,name=?,size=?,status=?,error=?,package=? WHERE id=?', (f.name, f.url, f.size, f.status, f.error, str(f.packageid), str(f.id)))
+
+ @async
+ def updatePackage(self, p):
+ self.c.execute('UPDATE packages SET name=?,folder=?,site=?,password=?,queue=? WHERE id=?', (p.name, p.folder, p.site, p.password, p.queue, str(p.id)))
+
+ @async
+ def commit(self):
+ self.conn.commit()
+
+ @queue
+ def getPackage(self, id):
+ """return package instance from id"""
+ self.c.execute("SELECT name,folder,site,password,queue FROM packages WHERE id=?", (str(id),))
+ r = self.c.fetchone()
+ if not r: return None
+ return PyPackage(self.manager, id, *r)
+
+ #----------------------------------------------------------------------
+ @queue
+ def getFile(self, id):
+ """return link instance from id"""
+ self.c.execute("SELECT url, name, size, status, error, plugin, package FROM links WHERE id=?", (str(id),))
+ r = self.c.fetchone()
+ if not r: return None
+ return PyFile(self.manager, id, *r)
+
+
+ @queue
+ def getJob(self, occ):
+ """return pyfile instance, which is suitable for download and dont use a occupied plugin"""
+ self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=1 AND l.plugin NOT IN ('else','some','else') AND l.status IN (2,3,6) LIMIT 5")
+
+ return [x[0] for x in self.c ]
+
+class PyFile():
+ def __init__(self, manager, id, url, name, size, status, error, pluginname, package):
+ self.m = manager
+ self.m.cache[int(id)] = self
+
+ self.id = int(id)
+ self.url = url
+ self.name = name
+ self.size = size
+ self.status = status
+ self.pluginname = pluginname
+ self.packageid = package #should not be used, use package() instead
+ self.error = error
+ # database information ends here
+
+ self.waitUntil = 0 # time() + time to wait
+
+ # status attributes
+ self.active = False #obsolete?
+ self.abort = False
+ self.reconnected = False
+
+
+ def __repr__(self):
+ return "PyFile %s: %s@%s" % (self.id, self.name, self.pluginname)
+
+ def initPlugin(self):
+ """ inits plugin instance """
+ self.pluginmodule = self.m.core.pluginManager.getPlugin(self.pluginname)
+ self.pluginclass = getattr(self.pluginmodule, self.pluginname)
+ self.plugin = self.pluginclass(self)
+
+
+ def package(self):
+ """ return package instance"""
+ return self.m.getPackage(self.packageid)
+
+ def setStatus(self, status):
+ self.status = statusMap[status]
+ self.sync() #@TODO needed aslong no better job approving exists
+
+ def hasStatus(self, status):
+ return statusMap[status] == self.status
+
+ def sync(self):
+ """sync PyFile instance with database"""
+ self.m.updateLink(self)
+
+ def release(self):
+ """sync and remove from cache"""
+ self.sync()
+ self.m.releaseLink(self.id)
+
+ def delete(self):
+ """delete pyfile from database"""
+ self.m.deleteLink(self.id)
+
+ def toDict(self):
+ """return dict with all information for interface"""
+ return self.toDbDict()
+
+ def toDbDict(self):
+ """return data as dict for databse
+
+ format:
+
+ {
+ id: {'url': url, 'name': name ... }
+ }
+
+ """
+ return {
+ self.id: {
+ 'url': self.url,
+ 'name': self.name,
+ 'plugin' : self.pluginname,
+ 'size': self.size,
+ 'status': self.status,
+ 'statusmsg': self.m.statusMsg[self.status],
+ 'package': self.packageid,
+ 'error': self.error
+ }
+ }
+
+ def abort(self):
+ """abort pyfile if possible"""
+
+ while self.id in self.m.core.ThreadManager.processingIds():
+ self.abort = True
+ sleep(0.025)
+
+ abort = False
+
+ def finishIfDone(self):
+ """set status to finish and release file if every thread is finished with it"""
+
+ if self.id in self.m.core.threadManager.processingIds():
+ return False
+
+ self.setStatus("finished")
+ self.release()
+ return True
+
+ def formatWait(self):
+ """ formats and return wait time in humanreadable format """
+ return self.waitUntil - time()
+
+
+
+class PyPackage():
+ def __init__(self, manager, id, name, folder, site, password, queue):
+ self.m = manager
+ self.m.packageCache[int(id)] = self
+
+ self.id = int(id)
+ self.name = name
+ self.folder = folder
+ self.site = site
+ self.password = password
+ self.queue = queue
+
+ def toDict(self):
+ """return data as dict
+
+ format:
+
+ {
+ id: {'name': name ... 'links': {} } }
+ }
+
+ """
+ return {
+ self.id: {
+ 'name': self.name,
+ 'folder': self.folder,
+ 'site': self.site,
+ 'password': self.password,
+ 'queue': self.queue,
+ 'links': {}
+ }
+ }
+
+ def getChildren(self):
+ """get information about contained links"""
+ raise NotImplementedError
+
+ def sync(self):
+ """sync with db"""
+ self.m.updatePackage(self)
+
+ def release(self):
+ """sync and delete from cache"""
+ self.sync()
+ self.m.releasePackage(self.id)
+
+ def delete(self):
+ self.m.deletePackage(self.id)
+
+
+if __name__ == "__main__":
+
+ pypath = "."
+
+ db = FileHandler(None)
+
+ #p = PyFile(db, 5)
+ #sleep(0.1)
+
+ a = time()
+
+ #print db.addPackage("package", "folder" , 1)
+
+ #print db.addPackage("package", "folder", 1)
+
+ #db.addLinks([x for x in range(0,200)], 5)
+
+ db.save()
+
+ b = time()
+ print "adding 200 links, single sql execs, no commit", b-a
+
+
+ res = db.getCompleteData(1)
+ #print res
+ r = [ len(x["links"]) for x in res.itervalues() ]
+ print r
+ c = time()
+ print "get all links", c-b
+
+ #i = 2
+ #db.updateLink(i, "urlupdate%s" % i, "nameupdate%s" % i, i, i, i,i)
+
+ d = time()
+ print "update one link", d-c
+
+ #p.sync()
+ #p.remove()
+
+ e = time()
+ print "sync and remove link", e-d
+
+ db.save()
+
+ db.deletePackage(1)
+ #db.commit()
+
+ f = time()
+ print "commit, remove package/links, commit", f-e
+
+ #db.commit()
+ sleep(0.5)
+
+ g = time()
+ print "empty commit", g-f -0.5
+
+
+ job = db.getJob("")
+ print job
+
+ h = time()
+ #print db.getFileCount()
+
+ print "get job", h-g
+
+ print db.getFileCount()
+
+ i = time()
+
+ print "file count", i-h
+
+
+ print db.getJob("")
+
+ j = time()
+
+
+ print "get job 2", j-i
+
+ for i in db.cache.itervalues():
+ i.sync()
+
+ sleep(1)
+
+ \ No newline at end of file