diff options
Diffstat (limited to 'module/FileDatabase.py')
-rw-r--r-- | module/FileDatabase.py | 1579 |
1 files changed, 805 insertions, 774 deletions
diff --git a/module/FileDatabase.py b/module/FileDatabase.py index 7e8d043d2..dde151cb3 100644 --- a/module/FileDatabase.py +++ b/module/FileDatabase.py @@ -14,6 +14,7 @@ along with this program; if not, see <http://www.gnu.org/licenses/>. @author: RaNaN + @author: mkaay """ from Queue import Queue import sqlite3 @@ -23,6 +24,8 @@ from time import sleep from time import time import traceback +from module.PullEvents import UpdateEvent, RemoveEvent, InsertEvent + statusMap = { "finished": 0, "offline": 1, @@ -41,814 +44,842 @@ statusMap = { } def formatSize(size): - """formats size of bytes""" - size = int(size) - steps = 0 - sizes = ["B", "KB", "MB", "GB" , "TB"] - - while size > 1000: - size /= 1024.0 - steps += 1 - - return "%.2f %s" % (size, sizes[steps]) + """formats size of bytes""" + size = int(size) + steps = 0 + sizes = ["B", "KB", "MB", "GB" , "TB"] + + while size > 1000: + size /= 1024.0 + steps += 1 + + return "%.2f %s" % (size, sizes[steps]) + ######################################################################## class FileHandler: - """Handles all request made to obtain information, - modify status or other request for links or packages""" - - - #---------------------------------------------------------------------- - def __init__(self, core): - """Constructor""" - self.core = core - - # translations - self.statusMsg = [_("finished"), _("offline"), _("online"), _("queued"), _("checking"), _("waiting"), _("reconnected"), _("starting"),_("failed"), _("aborted"), _("decrypting"), _("custom"),_("downloading"), _("processing")] - - self.cache = {} #holds instances for files - self.packageCache = {} # same for packages - #@TODO: purge the cache - - self.jobCache = {} - - self.lock = RLock() - - self.filecount = -1 # if an invalid value is set get current value from db - self.unchanged = False #determines if any changes was made since last call - - self.db = FileDatabaseBackend(self) # the backend - - - def change(func): - def new(*args): - args[0].unchanged = False - args[0].filecount = -1 - args[0].jobCache = {} - return func(*args) - return new - - #---------------------------------------------------------------------- - def save(self): - """saves all data to backend""" - self.db.commit() - - #---------------------------------------------------------------------- - def syncSave(self): - """saves all data to backend and waits until all data are written""" - self.db.syncSave() - - #---------------------------------------------------------------------- - def getCompleteData(self, queue=1): - """gets a complete data representation""" - - data = self.db.getAllLinks(queue) - packs = self.db.getAllPackages(queue) - - data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) - packs.update( [ (str(x.id), x.toDict()[x.id]) for x in self.packageCache.itervalues() if x.queue == queue] ) - - for key, value in data.iteritems(): - if packs.has_key(str(value["package"])): - packs[str(value["package"])]["links"][key] = value - - return packs - - #---------------------------------------------------------------------- - @change - def addLinks(self, urls, package): - """adds links""" - - # tuple of (url, name, plugin, package) - links = [ (x[0], x[0], x[1], package) for x in self.core.pluginManager.parseUrls(urls) ] - - self.db.addLinks(links, package) - - - #---------------------------------------------------------------------- - @change - def addPackage(self, name, folder, queue=0): - """adds a package, default to link collector""" - return self.db.addPackage(name, folder, queue) - - #---------------------------------------------------------------------- - @change - def deletePackage(self, id): - """delete package and all contained links""" - - self.lock.acquire() - - if self.packageCache.has_key(id): - del self.packageCache[id] - - pyfiles = self.cache.values() - - for pyfile in pyfiles: - if pyfile.packageid == id: - pyfile.abortDownload() - pyfile.release() - - - self.db.deletePackage(id) - - - self.lock.release() - - #---------------------------------------------------------------------- - @change - def deleteLink(self, id): - """deletes links""" - - self.lock.acquire() - - if self.cache.has_key(id): - if id in self.core.threadManager.processingIds(): - self.cache[id].abortDownload() - - - self.lock.release() - - self.db.deleteLink(id) - - #---------------------------------------------------------------------- - def releaseLink(self, id): - """removes pyfile from cache""" - if self.cache.has_key(id): - del self.cache[id] - - #---------------------------------------------------------------------- - def releasePackage(self, id): - """removes package from cache""" - if self.packageCache.has_key(id): - del self.packageCache[id] - - #---------------------------------------------------------------------- - def updateLink(self, pyfile): - """updates link""" - self.db.updateLink(pyfile) - - #---------------------------------------------------------------------- - def updatePackage(self, pypack): - """updates a package""" - self.db.updatePackage(pypack) - - #---------------------------------------------------------------------- - def getPackage(self, id): - """return package instance""" - - if self.packageCache.has_key(id): - return self.packageCache[id] - else: - return self.db.getPackage(id) - - #---------------------------------------------------------------------- - def getPackageData(self, id): - """returns dict with package information""" - pack = self.getPackage(id) - pack = pack.toDict()[id] - - data = self.db.getPackageData(id) - - data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) - - pack["links"] = data - - return pack - - #---------------------------------------------------------------------- - def getFile(self, id): - """returns pyfile instance""" - if self.cache.has_key(id): - return self.cache[id] - else: - return self.db.getFile(id) - - #---------------------------------------------------------------------- - def getJob(self, occ): - """get suitable job""" - - self.lock.acquire() - - #@TODO clean mess - - if self.jobCache.has_key(occ): - if self.jobCache[occ]: - id = self.jobCache[occ].pop() - if id == "empty": - pyfile = None - else: - pyfile = self.getFile(id) - else: - jobs = self.db.getJob(occ) - jobs.reverse() - if not jobs: - self.jobCache[occ].append("empty") - pyfile = None - else: - self.jobCache[occ].extend(jobs) - pyfile = self.getFile(self.jobCache[occ].pop()) - - else: - self.jobCache = {} #better not caching to much - jobs = self.db.getJob(occ) - jobs.reverse() - self.jobCache[occ] = jobs - - if not jobs: - self.jobCache[occ].append("empty") - pyfile = None - - pyfile = self.getFile(self.jobCache[occ].pop()) - #@TODO: maybe the new job has to be approved... - - - #pyfile = self.getFile(self.jobCache[occ].pop()) - - self.lock.release() - return pyfile - - - #---------------------------------------------------------------------- - def getFileCount(self): - """returns number of files""" - - if self.filecount == -1: - self.filecount = self.db.filecount(1) - - return self.filecount - - #---------------------------------------------------------------------- - def getQueueCount(self): - """number of files that have to be processed""" - pass - - #---------------------------------------------------------------------- - def restartPackage(self, id): - """restart package""" - for pyfile in self.cache.itervalues(): - if pyfile.packageid == id: - self.restartFile(pyfile.id) - - self.db.restartPackage(id) - - def restartFile(self, id): - """ restart file""" - if self.cache.has_key(id): - self.cache[id].abortDownload() - self.cache[id].status = 3 - self.cache[id].name = self.cache[id].url - self.cache[id].sync() - else: - self.db.restartFile(id) - + """Handles all request made to obtain information, + modify status or other request for links or packages""" + + + #---------------------------------------------------------------------- + def __init__(self, core): + """Constructor""" + self.core = core + + # translations + self.statusMsg = [_("finished"), _("offline"), _("online"), _("queued"), _("checking"), _("waiting"), _("reconnected"), _("starting"),_("failed"), _("aborted"), _("decrypting"), _("custom"),_("downloading"), _("processing")] + + self.cache = {} #holds instances for files + self.packageCache = {} # same for packages + #@TODO: purge the cache + + self.jobCache = {} + + self.lock = RLock() + + self.filecount = -1 # if an invalid value is set get current value from db + self.unchanged = False #determines if any changes was made since last call + + self.db = FileDatabaseBackend(self) # the backend + + + def change(func): + def new(*args): + args[0].unchanged = False + args[0].filecount = -1 + args[0].jobCache = {} + return func(*args) + return new + + #---------------------------------------------------------------------- + def save(self): + """saves all data to backend""" + self.db.commit() + + #---------------------------------------------------------------------- + def syncSave(self): + """saves all data to backend and waits until all data are written""" + self.db.syncSave() + + #---------------------------------------------------------------------- + def getCompleteData(self, queue=1): + """gets a complete data representation""" + + data = self.db.getAllLinks(queue) + packs = self.db.getAllPackages(queue) + + data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) + packs.update( [ (str(x.id), x.toDict()[x.id]) for x in self.packageCache.itervalues() if x.queue == queue] ) + + for key, value in data.iteritems(): + if packs.has_key(str(value["package"])): + packs[str(value["package"])]["links"][key] = value + + return packs + + #---------------------------------------------------------------------- + @change + def addLinks(self, urls, package): + """adds links""" + + for x in self.core.pluginManager.parseUrls(urls): + # tuple of (url, name, plugin, package) + lastID = self.db.addLink(x[0], x[0], x[1], package) + e = InsertEvent("file", lastID, -1, "collector" if not self.getPackage(package).queue else "queue") + self.core.pullManager.addEvent(e) + + + #---------------------------------------------------------------------- + @change + def addPackage(self, name, folder, queue=0): + """adds a package, default to link collector""" + lastID = self.db.addPackage(name, folder, queue) + e = InsertEvent("pack", lastID, -1, "collector" if not queue else "queue") + self.core.pullManager.addEvent(e) + return lastID + + #---------------------------------------------------------------------- + @change + def deletePackage(self, id): + """delete package and all contained links""" + + self.lock.acquire() + + if self.packageCache.has_key(id): + del self.packageCache[id] + + pyfiles = self.cache.values() + + for pyfile in pyfiles: + if pyfile.packageid == id: + pyfile.abortDownload() + pyfile.release() + + self.db.deletePackage(id) + + e = RemoveEvent("pack", id, "collector" if not self.getPackage(id).queue else "queue") + self.core.pullManager.addEvent(e) + + self.lock.release() + + #---------------------------------------------------------------------- + @change + def deleteLink(self, id): + """deletes links""" + + self.lock.acquire() + + e = RemoveEvent("file", id, "collector" if not self.getFile(id).package().queue else "queue") + + if self.cache.has_key(id): + if id in self.core.threadManager.processingIds(): + self.cache[id].abortDownload() + + self.lock.release() + + self.db.deleteLink(id) + + self.core.pullManager.addEvent(e) + + #---------------------------------------------------------------------- + def releaseLink(self, id): + """removes pyfile from cache""" + if self.cache.has_key(id): + del self.cache[id] + + #---------------------------------------------------------------------- + def releasePackage(self, id): + """removes package from cache""" + if self.packageCache.has_key(id): + del self.packageCache[id] + + #---------------------------------------------------------------------- + def updateLink(self, pyfile): + """updates link""" + self.db.updateLink(pyfile) + + e = UpdateEvent("file", pyfile.id, "collector" if not pyfile.package().queue else "queue") + self.core.pullManager.addEvent(e) + + #---------------------------------------------------------------------- + def updatePackage(self, pypack): + """updates a package""" + self.db.updatePackage(pypack) + + e = UpdateEvent("pack", pypack.id, "collector" if not pypack.queue else "queue") + self.core.pullManager.addEvent(e) + + #---------------------------------------------------------------------- + def getPackage(self, id): + """return package instance""" + + if self.packageCache.has_key(id): + return self.packageCache[id] + else: + return self.db.getPackage(id) + + #---------------------------------------------------------------------- + def getPackageData(self, id): + """returns dict with package information""" + pack = self.getPackage(id) + pack = pack.toDict()[id] + + data = self.db.getPackageData(id) + + data.update( [ (str(x.id), x.toDbDict()[x.id]) for x in self.cache.itervalues() ] ) + + pack["links"] = data + + return pack + + #---------------------------------------------------------------------- + def getFileData(self, id): + """returns dict with package information""" + pyfile = self.getFile(id) + + return pyfile.toDbDict() + + #---------------------------------------------------------------------- + def getFile(self, id): + """returns pyfile instance""" + if self.cache.has_key(id): + return self.cache[id] + else: + return self.db.getFile(id) + + #---------------------------------------------------------------------- + def getJob(self, occ): + """get suitable job""" + + self.lock.acquire() + + #@TODO clean mess + + if self.jobCache.has_key(occ): + if self.jobCache[occ]: + id = self.jobCache[occ].pop() + if id == "empty": + pyfile = None + else: + pyfile = self.getFile(id) + else: + jobs = self.db.getJob(occ) + jobs.reverse() + if not jobs: + self.jobCache[occ].append("empty") + pyfile = None + else: + self.jobCache[occ].extend(jobs) + pyfile = self.getFile(self.jobCache[occ].pop()) + + else: + self.jobCache = {} #better not caching to much + jobs = self.db.getJob(occ) + jobs.reverse() + self.jobCache[occ] = jobs + + if not jobs: + self.jobCache[occ].append("empty") + pyfile = None + + pyfile = self.getFile(self.jobCache[occ].pop()) + #@TODO: maybe the new job has to be approved... + + + #pyfile = self.getFile(self.jobCache[occ].pop()) + + self.lock.release() + return pyfile + + + #---------------------------------------------------------------------- + def getFileCount(self): + """returns number of files""" + + if self.filecount == -1: + self.filecount = self.db.filecount(1) + + return self.filecount + + #---------------------------------------------------------------------- + def getQueueCount(self): + """number of files that have to be processed""" + pass + + #---------------------------------------------------------------------- + def restartPackage(self, id): + """restart package""" + for pyfile in self.cache.itervalues(): + if pyfile.packageid == id: + self.restartFile(pyfile.id) + + self.db.restartPackage(id) + + e = UpdateEvent("pack", id, "collector" if not self.getPackage(id).queue else "queue") + self.core.pullManager.addEvent(e) + + def restartFile(self, id): + """ restart file""" + if self.cache.has_key(id): + self.cache[id].abortDownload() + self.cache[id].status = 3 + self.cache[id].name = self.cache[id].url + self.cache[id].sync() + else: + self.db.restartFile(id) + + e = UpdateEvent("file", id, "collector" if not self.getFile(id).package().queue else "queue") + self.core.pullManager.addEvent(e) + ######################################################################### class FileDatabaseBackend(Thread): - """underlying backend for the filehandler to save the data""" - - def __init__(self, manager): - Thread.__init__(self) - - self.setDaemon(True) + """underlying backend for the filehandler to save the data""" + + def __init__(self, manager): + Thread.__init__(self) + + self.setDaemon(True) - self.manager = manager - - self.jobs = Queue() # queues for jobs - self.res = Queue() - - self.start() - - - def queue(func): - """use as decorator when fuction directly executes sql commands""" - def new(*args): - args[0].jobs.put((func, args, 0)) - return args[0].res.get() - return new - - def async(func): - """use as decorator when function does not return anything and asynchron execution is wanted""" - def new(*args): - args[0].jobs.put((func, args, 1)) - return True - return new - - def run(self): - """main loop, which executes commands""" - - self.conn = sqlite3.connect("files.db") - self.c = self.conn.cursor() - #self.c.execute("PRAGMA synchronous = OFF") - self._createTables() - - while True: - try: - f, args, async = self.jobs.get() - if f == "quit": return True - res = f(*args) - if not async: self.res.put(res) - except Exception, e: - #@TODO log etc - print "Database Error @", f.__name__, args[1:], e - traceback.print_exc() - if not async: self.res.put(None) - - def shutdown(self): - self.save() - self.jobs.put(("quit", "", 0)) - - def _createTables(self): - """create tables for database""" - - self.c.execute('CREATE TABLE IF NOT EXISTS "packages" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" TEXT NOT NULL, "folder" TEXT, "password" TEXT, "site" TEXT, "queue" INTEGER DEFAULT 0 NOT NULL)') - self.c.execute('CREATE TABLE IF NOT EXISTS "links" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "url" TEXT NOT NULL, "name" TEXT, "size" INTEGER DEFAULT 0 NOT NULL, "status" INTEGER DEFAULT 3 NOT NULL, "plugin" TEXT DEFAULT "BasePlugin" NOT NULL, "error" TEXT DEFAULT "", "package" INTEGER DEFAULT 0 NOT NULL, FOREIGN KEY(package) REFERENCES packages(id))') - self.c.execute('CREATE INDEX IF NOT EXISTS "pIdIndex" ON links(package)') - self.c.execute('VACUUM') - - #---------------------------------------------------------------------- - @queue - def filecount(self, queue): - """returns number of files in queue""" - self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id", (queue,)) - r = self.c.fetchall() - return len(r) - - @queue - def addLink(self, url, name, plugin, package): - self.c.execute('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', (url, name, plugin, package)) - return self.c.lastrowid - - @queue - def addLinks(self, links, package): - """ links is a list of tupels (url,name,plugin)""" - self.c.executemany('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', links) - - @queue - def addPackage(self, name, folder, queue): - - self.c.execute('INSERT INTO packages(name, folder, queue) VALUES(?,?,?)', (name, folder, queue)) - return self.c.lastrowid - - @queue - def deletePackage(self, id): - - self.c.execute('DELETE FROM links WHERE package=?', (str(id), )) - self.c.execute('DELETE FROM packages WHERE id=?', (str(id), )) - - @queue - def deleteLink(self, id): - - self.c.execute('DELETE FROM links WHERE id=?', (str(id), )) - - - @queue - def getAllLinks(self, q): - """return information about all links in queue q - - q0 queue - q1 collector - - format: - - { - id: {'name': name, ... 'package': id }, ... - } - - """ - self.c.execute('SELECT l.id,l.url,l.name,l.size,l.status,l.error,l.plugin,l.package FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id', (q, )) - data = {} - for r in self.c: - data[str(r[0])] = { - 'url': r[1], - 'name': r[2], - 'size': r[3], - 'format_size': formatSize(r[3]), - 'status': r[4], - 'statusmsg': self.manager.statusMsg[r[4]], - 'error': r[5], - 'plugin': r[6], - 'package': r[7] - } - - return data - - @queue - def getAllPackages(self, q): - """return information about packages in queue q - (only useful in get all data) - - q0 queue - q1 collector - - format: - - { - id: {'name': name ... 'links': {} }, ... - } - """ - self.c.execute('SELECT id,name,folder,site,password,queue FROM packages WHERE queue=? ORDER BY id', str(q)) - - data = {} - for r in self.c: - data[str(r[0])] = { - 'name': r[1], - 'folder': r[2], - 'site': r[3], - 'password': r[4], - 'queue': r[5], - 'links': {} - } - - return data - - - def getLinkData(self, id): - """get link information""" - pass - - @queue - def getPackageData(self, id): - """get package data""" - self.c.execute('SELECT id,url,name,size,status,error,plugin,package FROM links WHERE package=? ORDER BY id', (str(id),)) - - data = {} - for r in self.c: - data[str(r[0])] = { - 'url': r[1], - 'name': r[2], - 'size': r[3], - 'format_size': formatSize(r[3]), - 'status': r[4], - 'statusmsg': self.manager.statusMsg[r[4]], - 'error': r[5], - 'plugin': r[6], - 'package': r[7] - } - - return data - - - @async - def updateLink(self, f): - self.c.execute('UPDATE links SET url=?,name=?,size=?,status=?,error=?,package=? WHERE id=?', (f.url, f.name, f.size, f.status, f.error, str(f.packageid), str(f.id))) - - @async - def updatePackage(self, p): - self.c.execute('UPDATE packages SET name=?,folder=?,site=?,password=?,queue=? WHERE id=?', (p.name, p.folder, p.site, p.password, p.queue, str(p.id))) - - @async - def restartFile(self, id): - self.c.execute('UPDATE links SET status=3 WHERE id=?', ( str(id), ) ) - - @async - def restartPackage(self, id): - self.c.execute('UPDATE links SET status=3 WHERE package=?', ( str(id), ) ) - - @async - def commit(self): - self.conn.commit() - - @queue - def syncSave(self): - self.conn.commit() - - @queue - def getPackage(self, id): - """return package instance from id""" - self.c.execute("SELECT name,folder,site,password,queue FROM packages WHERE id=?", (str(id),)) - r = self.c.fetchone() - if not r: return None - return PyPackage(self.manager, id, *r) - - #---------------------------------------------------------------------- - @queue - def getFile(self, id): - """return link instance from id""" - self.c.execute("SELECT url, name, size, status, error, plugin, package FROM links WHERE id=?", (str(id),)) - r = self.c.fetchone() - if not r: return None - return PyFile(self.manager, id, *r) - - - @queue - def getJob(self, occ): - """return pyfile instance, which is suitable for download and dont use a occupied plugin""" - - cmd = "(" - i = 0 - for item in occ: - if i != 0: cmd += ", " - cmd += "'%s'" % item - - cmd += ")" - - cmd = "SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=1 AND l.plugin NOT IN %s AND l.status IN (2,3,6) LIMIT 5" % cmd - - self.c.execute(cmd) # very bad! - - return [x[0] for x in self.c ] + self.manager = manager + + self.jobs = Queue() # queues for jobs + self.res = Queue() + + self.start() + + + def queue(func): + """use as decorator when fuction directly executes sql commands""" + def new(*args): + args[0].jobs.put((func, args, 0)) + return args[0].res.get() + return new + + def async(func): + """use as decorator when function does not return anything and asynchron execution is wanted""" + def new(*args): + args[0].jobs.put((func, args, 1)) + return True + return new + + def run(self): + """main loop, which executes commands""" + + self.conn = sqlite3.connect("files.db") + self.c = self.conn.cursor() + #self.c.execute("PRAGMA synchronous = OFF") + self._createTables() + + while True: + try: + f, args, async = self.jobs.get() + if f == "quit": return True + res = f(*args) + if not async: self.res.put(res) + except Exception, e: + #@TODO log etc + print "Database Error @", f.__name__, args[1:], e + traceback.print_exc() + if not async: self.res.put(None) + + def shutdown(self): + self.save() + self.jobs.put(("quit", "", 0)) + + def _createTables(self): + """create tables for database""" + + self.c.execute('CREATE TABLE IF NOT EXISTS "packages" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "name" TEXT NOT NULL, "folder" TEXT, "password" TEXT, "site" TEXT, "queue" INTEGER DEFAULT 0 NOT NULL)') + self.c.execute('CREATE TABLE IF NOT EXISTS "links" ("id" INTEGER PRIMARY KEY AUTOINCREMENT, "url" TEXT NOT NULL, "name" TEXT, "size" INTEGER DEFAULT 0 NOT NULL, "status" INTEGER DEFAULT 3 NOT NULL, "plugin" TEXT DEFAULT "BasePlugin" NOT NULL, "error" TEXT DEFAULT "", "package" INTEGER DEFAULT 0 NOT NULL, FOREIGN KEY(package) REFERENCES packages(id))') + self.c.execute('CREATE INDEX IF NOT EXISTS "pIdIndex" ON links(package)') + self.c.execute('VACUUM') + + #---------------------------------------------------------------------- + @queue + def filecount(self, queue): + """returns number of files in queue""" + self.c.execute("SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id", (queue,)) + r = self.c.fetchall() + return len(r) + + @queue + def addLink(self, url, name, plugin, package): + self.c.execute('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', (url, name, plugin, package)) + return self.c.lastrowid + + @queue + def addLinks(self, links, package): + """ links is a list of tupels (url,name,plugin)""" + self.c.executemany('INSERT INTO links(url, name, plugin, package) VALUES(?,?,?,?)', links) + + @queue + def addPackage(self, name, folder, queue): + + self.c.execute('INSERT INTO packages(name, folder, queue) VALUES(?,?,?)', (name, folder, queue)) + return self.c.lastrowid + + @queue + def deletePackage(self, id): + + self.c.execute('DELETE FROM links WHERE package=?', (str(id), )) + self.c.execute('DELETE FROM packages WHERE id=?', (str(id), )) + + @queue + def deleteLink(self, id): + + self.c.execute('DELETE FROM links WHERE id=?', (str(id), )) + + + @queue + def getAllLinks(self, q): + """return information about all links in queue q + + q0 queue + q1 collector + + format: + + { + id: {'name': name, ... 'package': id }, ... + } + + """ + self.c.execute('SELECT l.id,l.url,l.name,l.size,l.status,l.error,l.plugin,l.package FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=? ORDER BY l.id', (q, )) + data = {} + for r in self.c: + data[str(r[0])] = { + 'url': r[1], + 'name': r[2], + 'size': r[3], + 'format_size': formatSize(r[3]), + 'status': r[4], + 'statusmsg': self.manager.statusMsg[r[4]], + 'error': r[5], + 'plugin': r[6], + 'package': r[7] + } + + return data + + @queue + def getAllPackages(self, q): + """return information about packages in queue q + (only useful in get all data) + + q0 queue + q1 collector + + format: + + { + id: {'name': name ... 'links': {} }, ... + } + """ + self.c.execute('SELECT id,name,folder,site,password,queue FROM packages WHERE queue=? ORDER BY id', str(q)) + + data = {} + for r in self.c: + data[str(r[0])] = { + 'name': r[1], + 'folder': r[2], + 'site': r[3], + 'password': r[4], + 'queue': r[5], + 'links': {} + } + + return data + + + def getLinkData(self, id): + """get link information""" + pass + + @queue + def getPackageData(self, id): + """get package data""" + self.c.execute('SELECT id,url,name,size,status,error,plugin,package FROM links WHERE package=? ORDER BY id', (str(id),)) + + data = {} + for r in self.c: + data[str(r[0])] = { + 'url': r[1], + 'name': r[2], + 'size': r[3], + 'format_size': formatSize(r[3]), + 'status': r[4], + 'statusmsg': self.manager.statusMsg[r[4]], + 'error': r[5], + 'plugin': r[6], + 'package': r[7] + } + + return data + + + @async + def updateLink(self, f): + self.c.execute('UPDATE links SET url=?,name=?,size=?,status=?,error=?,package=? WHERE id=?', (f.url, f.name, f.size, f.status, f.error, str(f.packageid), str(f.id))) + + @async + def updatePackage(self, p): + self.c.execute('UPDATE packages SET name=?,folder=?,site=?,password=?,queue=? WHERE id=?', (p.name, p.folder, p.site, p.password, p.queue, str(p.id))) + + @async + def restartFile(self, id): + self.c.execute('UPDATE links SET status=3 WHERE id=?', ( str(id), ) ) + + @async + def restartPackage(self, id): + self.c.execute('UPDATE links SET status=3 WHERE package=?', ( str(id), ) ) + + @async + def commit(self): + self.conn.commit() + + @queue + def syncSave(self): + self.conn.commit() + + @queue + def getPackage(self, id): + """return package instance from id""" + self.c.execute("SELECT name,folder,site,password,queue FROM packages WHERE id=?", (str(id),)) + r = self.c.fetchone() + if not r: return None + return PyPackage(self.manager, id, *r) + + #---------------------------------------------------------------------- + @queue + def getFile(self, id): + """return link instance from id""" + self.c.execute("SELECT url, name, size, status, error, plugin, package FROM links WHERE id=?", (str(id),)) + r = self.c.fetchone() + if not r: return None + return PyFile(self.manager, id, *r) + + + @queue + def getJob(self, occ): + """return pyfile instance, which is suitable for download and dont use a occupied plugin""" + + cmd = "(" + i = 0 + for item in occ: + if i != 0: cmd += ", " + cmd += "'%s'" % item + + cmd += ")" + + cmd = "SELECT l.id FROM links as l INNER JOIN packages as p ON l.package=p.id WHERE p.queue=1 AND l.plugin NOT IN %s AND l.status IN (2,3,6) LIMIT 5" % cmd + + self.c.execute(cmd) # very bad! + + return [x[0] for x in self.c ] class PyFile(): - def __init__(self, manager, id, url, name, size, status, error, pluginname, package): - self.m = manager - self.m.cache[int(id)] = self - - self.id = int(id) - self.url = url - self.name = name - self.size = size - self.status = status - self.pluginname = pluginname - self.packageid = package #should not be used, use package() instead - self.error = error - # database information ends here - - self.plugin = None - - self.waitUntil = 0 # time() + time to wait - - # status attributes - self.active = False #obsolete? - self.abort = False - self.reconnected = False - - - def __repr__(self): - return "PyFile %s: %s@%s" % (self.id, self.name, self.pluginname) - - def initPlugin(self): - """ inits plugin instance """ - self.pluginmodule = self.m.core.pluginManager.getPlugin(self.pluginname) - self.pluginclass = getattr(self.pluginmodule, self.pluginname) - self.plugin = self.pluginclass(self) - - - def package(self): - """ return package instance""" - return self.m.getPackage(self.packageid) - - def setStatus(self, status): - self.status = statusMap[status] - self.sync() #@TODO needed aslong no better job approving exists - - def hasStatus(self, status): - return statusMap[status] == self.status - - def sync(self): - """sync PyFile instance with database""" - self.m.updateLink(self) - - def release(self): - """sync and remove from cache""" - self.sync() - self.m.releaseLink(self.id) - - def delete(self): - """delete pyfile from database""" - self.m.deleteLink(self.id) - - def toDict(self): - """return dict with all information for interface""" - return self.toDbDict() - - def toDbDict(self): - """return data as dict for databse - - format: - - { - id: {'url': url, 'name': name ... } - } - - """ - return { - self.id: { - 'url': self.url, - 'name': self.name, - 'plugin' : self.pluginname, - 'size': self.getSize(), - 'format_size': self.formatSize(), - 'status': self.status, - 'statusmsg': self.m.statusMsg[self.status], - 'package': self.packageid, - 'error': self.error - } - } - - def abortDownload(self): - """abort pyfile if possible""" - while self.id in self.m.core.threadManager.processingIds(): - self.abort = True - if self.plugin: self.plugin.req.abort = True - sleep(0.1) - - abort = False - self.plugin.req.abort = False - - def finishIfDone(self): - """set status to finish and release file if every thread is finished with it""" - - if self.id in self.m.core.threadManager.processingIds(): - return False - - self.setStatus("finished") - self.release() - return True - - def formatWait(self): - """ formats and return wait time in humanreadable format """ - seconds = self.waitUntil - time() - - if seconds < 0 : return "00:00:00" - - hours, seconds = divmod(seconds, 3600) - minutes, seconds = divmod(seconds, 60) - return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) - - def formatSize(self): - """ formats size to readable format """ - return formatSize(self.getSize()) - - def formatETA(self): - """ formats eta to readable format """ - seconds = self.getETA() - - if seconds < 0 : return "00:00:00" - - hours, seconds = divmod(seconds, 3600) - minutes, seconds = divmod(seconds, 60) - return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) - - def getSpeed(self): - """ calculates speed """ - try: - return self.plugin.req.get_speed() - except: - return 0 - - def getETA(self): - """ gets established time of arrival""" - try: - return self.plugin.req.get_ETA() - except: - return 0 - - def getBytesLeft(self): - """ gets bytes left """ - try: - return self.plugin.req.bytes_left() - except: - return 0 - - def getPercent(self): - """ get % of download """ - try: - return int((float(self.plugin.req.dl_arrived) / self.plugin.req.dl_size) * 100) - except: - return 0 - - def getSize(self): - """ get size of download """ - if self.size: return self.size - else: - try: - return self.plugin.req.dl_size - except: - return 0 + def __init__(self, manager, id, url, name, size, status, error, pluginname, package): + self.m = manager + self.m.cache[int(id)] = self + + self.id = int(id) + self.url = url + self.name = name + self.size = size + self.status = status + self.pluginname = pluginname + self.packageid = package #should not be used, use package() instead + self.error = error + # database information ends here + + self.plugin = None + + self.waitUntil = 0 # time() + time to wait + + # status attributes + self.active = False #obsolete? + self.abort = False + self.reconnected = False + + + def __repr__(self): + return "PyFile %s: %s@%s" % (self.id, self.name, self.pluginname) + + def initPlugin(self): + """ inits plugin instance """ + self.pluginmodule = self.m.core.pluginManager.getPlugin(self.pluginname) + self.pluginclass = getattr(self.pluginmodule, self.pluginname) + self.plugin = self.pluginclass(self) + + + def package(self): + """ return package instance""" + return self.m.getPackage(self.packageid) + + def setStatus(self, status): + self.status = statusMap[status] + self.sync() #@TODO needed aslong no better job approving exists + + def hasStatus(self, status): + return statusMap[status] == self.status + + def sync(self): + """sync PyFile instance with database""" + self.m.updateLink(self) + + def release(self): + """sync and remove from cache""" + self.sync() + self.m.releaseLink(self.id) + + def delete(self): + """delete pyfile from database""" + self.m.deleteLink(self.id) + + def toDict(self): + """return dict with all information for interface""" + return self.toDbDict() + + def toDbDict(self): + """return data as dict for databse + + format: + + { + id: {'url': url, 'name': name ... } + } + + """ + return { + self.id: { + 'url': self.url, + 'name': self.name, + 'plugin' : self.pluginname, + 'size': self.getSize(), + 'format_size': self.formatSize(), + 'status': self.status, + 'statusmsg': self.m.statusMsg[self.status], + 'package': self.packageid, + 'error': self.error + } + } + + def abortDownload(self): + """abort pyfile if possible""" + while self.id in self.m.core.threadManager.processingIds(): + self.abort = True + if self.plugin and self.plugin.req: self.plugin.req.abort = True + sleep(0.1) + + abort = False + self.plugin.req.abort = False + + def finishIfDone(self): + """set status to finish and release file if every thread is finished with it""" + + if self.id in self.m.core.threadManager.processingIds(): + return False + + self.setStatus("finished") + self.release() + return True + + def formatWait(self): + """ formats and return wait time in humanreadable format """ + seconds = self.waitUntil - time() + + if seconds < 0 : return "00:00:00" + + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) + return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) + + def formatSize(self): + """ formats size to readable format """ + return formatSize(self.getSize()) + + def formatETA(self): + """ formats eta to readable format """ + seconds = self.getETA() + + if seconds < 0 : return "00:00:00" + + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) + return "%.2i:%.2i:%.2i" % (hours, minutes, seconds) + + def getSpeed(self): + """ calculates speed """ + try: + return self.plugin.req.get_speed() + except: + return 0 + + def getETA(self): + """ gets established time of arrival""" + try: + return self.plugin.req.get_ETA() + except: + return 0 + + def getBytesLeft(self): + """ gets bytes left """ + try: + return self.plugin.req.bytes_left() + except: + return 0 + + def getPercent(self): + """ get % of download """ + try: + return int((float(self.plugin.req.dl_arrived) / self.plugin.req.dl_size) * 100) + except: + return 0 + + def getSize(self): + """ get size of download """ + if self.size: return self.size + else: + try: + return self.plugin.req.dl_size + except: + return 0 class PyPackage(): - def __init__(self, manager, id, name, folder, site, password, queue): - self.m = manager - self.m.packageCache[int(id)] = self - - self.id = int(id) - self.name = name - self.folder = folder - self.site = site - self.password = password - self.queue = queue - - def toDict(self): - """return data as dict - - format: - - { - id: {'name': name ... 'links': {} } } - } - - """ - return { - self.id: { - 'name': self.name, - 'folder': self.folder, - 'site': self.site, - 'password': self.password, - 'queue': self.queue, - 'links': {} - } - } - - def getChildren(self): - """get information about contained links""" - raise NotImplementedError - - def sync(self): - """sync with db""" - self.m.updatePackage(self) - - def release(self): - """sync and delete from cache""" - self.sync() - self.m.releasePackage(self.id) - - def delete(self): - self.m.deletePackage(self.id) + def __init__(self, manager, id, name, folder, site, password, queue): + self.m = manager + self.m.packageCache[int(id)] = self + + self.id = int(id) + self.name = name + self.folder = folder + self.site = site + self.password = password + self.queue = queue + + def toDict(self): + """return data as dict + + format: + + { + id: {'name': name ... 'links': {} } } + } + + """ + return { + self.id: { + 'name': self.name, + 'folder': self.folder, + 'site': self.site, + 'password': self.password, + 'queue': self.queue, + 'links': {} + } + } + + def getChildren(self): + """get information about contained links""" + raise NotImplementedError + + def sync(self): + """sync with db""" + self.m.updatePackage(self) + + def release(self): + """sync and delete from cache""" + self.sync() + self.m.releasePackage(self.id) + + def delete(self): + self.m.deletePackage(self.id) if __name__ == "__main__": - pypath = "." + pypath = "." - db = FileHandler(None) + db = FileHandler(None) - #p = PyFile(db, 5) - #sleep(0.1) + #p = PyFile(db, 5) + #sleep(0.1) - a = time() + a = time() - #print db.addPackage("package", "folder" , 1) - - #print db.addPackage("package", "folder", 1) + #print db.addPackage("package", "folder" , 1) + + #print db.addPackage("package", "folder", 1) - #db.addLinks([x for x in range(0,200)], 5) + #db.addLinks([x for x in range(0,200)], 5) - db.save() + db.save() - b = time() - print "adding 200 links, single sql execs, no commit", b-a + b = time() + print "adding 200 links, single sql execs, no commit", b-a - res = db.getCompleteData(1) - #print res - r = [ len(x["links"]) for x in res.itervalues() ] - print r - c = time() - print "get all links", c-b + res = db.getCompleteData(1) + #print res + r = [ len(x["links"]) for x in res.itervalues() ] + print r + c = time() + print "get all links", c-b - #i = 2 - #db.updateLink(i, "urlupdate%s" % i, "nameupdate%s" % i, i, i, i,i) + #i = 2 + #db.updateLink(i, "urlupdate%s" % i, "nameupdate%s" % i, i, i, i,i) - d = time() - print "update one link", d-c + d = time() + print "update one link", d-c - #p.sync() - #p.remove() + #p.sync() + #p.remove() - e = time() - print "sync and remove link", e-d + e = time() + print "sync and remove link", e-d - db.save() + db.save() - db.deletePackage(1) - #db.commit() + db.deletePackage(1) + #db.commit() - f = time() - print "commit, remove package/links, commit", f-e + f = time() + print "commit, remove package/links, commit", f-e - #db.commit() - sleep(0.5) + #db.commit() + sleep(0.5) - g = time() - print "empty commit", g-f -0.5 + g = time() + print "empty commit", g-f -0.5 - job = db.getJob("") - print job - - h = time() - #print db.getFileCount() - - print "get job", h-g + job = db.getJob("") + print job + + h = time() + #print db.getFileCount() + + print "get job", h-g - print db.getFileCount() - - i = time() - - print "file count", i-h - - - print db.getJob("") - - j = time() - - - print "get job 2", j-i - - for i in db.cache.itervalues(): - i.sync() - - sleep(1) - -
\ No newline at end of file + print db.getFileCount() + + i = time() + + print "file count", i-h + + + print db.getJob("") + + j = time() + + + print "get job 2", j-i + + for i in db.cache.itervalues(): + i.sync() + + sleep(1) + + |