#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see .
@author: RaNaN
"""
from Queue import Queue
from threading import Thread
from os import listdir
from os.path import join
from time import sleep, time, strftime
from traceback import print_exc, format_exc
from pprint import pformat
from sys import exc_info, exc_clear
from types import MethodType
from pycurl import error
from PyFile import PyFile
from plugins.Plugin import Abort, Fail, Reconnect, Retry, SkipDownload
from common.packagetools import parseNames
from utils import save_join
from remote.thriftbackend.thriftgen.pyload.ttypes import OnlineStatus
class PluginThread(Thread):
"""abstract base class for thread types"""
#----------------------------------------------------------------------
def __init__(self, manager):
"""Constructor"""
Thread.__init__(self)
self.setDaemon(True)
self.m = manager #thread manager
def writeDebugReport(self, pyfile):
""" writes a
:return:
"""
dump_name = "debug_%s_%s" % (pyfile.pluginname, strftime("%d-%m-%Y_%H-%M-%S"))
dump = self.getDebugDump(pyfile)
try:
import zipfile
dump_name += ".zip"
zip = zipfile.ZipFile(dump_name, "w")
for f in listdir(join("tmp", pyfile.pluginname)):
try:
# avoid encoding errors
zip.write(join("tmp", pyfile.pluginname, f), save_join(pyfile.pluginname, f))
except:
pass
zip.writestr(save_join(pyfile.pluginname, "debug_Report.txt"), dump)
zip.close()
except:
dump_name += ".txt"
f = open(dump_name, "wb")
f.write(dump)
f.close()
self.m.core.log.info("Debug Report written to %s" % dump_name)
def getDebugDump(self, pyfile):
dump = "pyLoad %s Debug Report of %s %s \n\nTRACEBACK:\n %s \n\nFRAMESTACK:\n" % (
self.m.core.api.getServerVersion(), pyfile.pluginname, pyfile.plugin.__version__, format_exc())
tb = exc_info()[2]
stack = []
while tb:
stack.append(tb.tb_frame)
tb = tb.tb_next
for frame in stack[1:]:
dump += "\nFrame %s in %s at line %s\n" % (frame.f_code.co_name,
frame.f_code.co_filename,
frame.f_lineno)
for key, value in frame.f_locals.items():
dump += "\t%20s = " % key
try:
dump += pformat(value) + "\n"
except Exception, e:
dump += " " + str(e) + "\n"
del frame
del stack #delete it just to be sure...
dump += "\n\nPLUGIN OBJECT DUMP: \n\n"
for name in dir(pyfile.plugin):
attr = getattr(pyfile.plugin, name)
if not name.endswith("__") and type(attr) != MethodType:
dump += "\t%20s = " % name
try:
dump += pformat(attr) + "\n"
except Exception, e:
dump += " " + str(e) + "\n"
dump += "\nPYFILE OBJECT DUMP: \n\n"
for name in dir(pyfile):
attr = getattr(pyfile, name)
if not name.endswith("__") and type(attr) != MethodType:
dump += "\t%20s = " % name
try:
dump += pformat(attr) + "\n"
except Exception, e:
dump += " " + str(e) + "\n"
if pyfile.pluginname in self.m.core.config.plugin:
dump += "\n\nCONFIG: \n\n"
dump += pformat(self.m.core.config.plugin[pyfile.pluginname]) + "\n"
return dump
def clean(self, pyfile):
""" set thread unactive and release pyfile """
self.active = False
pyfile.release()
class DownloadThread(PluginThread):
"""thread for downloading files from 'real' hoster plugins"""
#----------------------------------------------------------------------
def __init__(self, manager):
"""Constructor"""
PluginThread.__init__(self, manager)
self.queue = Queue() # job queue
self.active = False
self.start()
#----------------------------------------------------------------------
def run(self):
"""run method"""
pyfile = None
while True:
del pyfile
self.active = self.queue.get()
pyfile = self.active
if self.active == "quit":
self.active = False
self.m.threads.remove(self)
return True
try:
if not pyfile.hasPlugin(): continue
#this pyfile was deleted while queueing
pyfile.plugin.checkForSameFiles(starting=True)
self.m.log.info(_("Download starts: %s" % pyfile.name))
# start download
self.m.core.hookManager.downloadStarts(pyfile)
pyfile.plugin.preprocessing(self)
except NotImplementedError:
self.m.log.error(_("Plugin %s is missing a function.") % pyfile.pluginname)
pyfile.setStatus("failed")
pyfile.error = "Plugin does not work"
self.clean(pyfile)
continue
except Abort:
try:
self.m.log.info(_("Download aborted: %s") % pyfile.name)
except:
pass
pyfile.setStatus("aborted")
self.clean(pyfile)
continue
except Reconnect:
self.queue.put(pyfile)
#pyfile.req.clearCookies()
while self.m.reconnecting.isSet():
sleep(0.5)
continue
except Retry, e:
reason = e.args[0]
self.m.log.info(_("Download restarted: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": reason})
self.queue.put(pyfile)
continue
except Fail, e:
msg = e.args[0]
if msg == "offline":
pyfile.setStatus("offline")
self.m.log.warning(_("Download is offline: %s") % pyfile.name)
elif msg == "temp. offline":
pyfile.setStatus("temp. offline")
self.m.log.warning(_("Download is temporary offline: %s") % pyfile.name)
else:
pyfile.setStatus("failed")
self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": msg})
pyfile.error = msg
self.m.core.hookManager.downloadFailed(pyfile)
self.clean(pyfile)
continue
except error, e:
if len(e.args) == 2:
code, msg = e.args
else:
code = 0
msg = e.args
self.m.log.debug("pycurl exception %s: %s" % (code, msg))
if code in (7, 18, 28, 52, 56):
self.m.log.warning(_("Couldn't connect to host or connection reset, waiting 1 minute and retry."))
wait = time() + 60
pyfile.waitUntil = wait
pyfile.setStatus("waiting")
while time() < wait:
sleep(1)
if pyfile.abort:
break
if pyfile.abort:
self.m.log.info(_("Download aborted: %s") % pyfile.name)
pyfile.setStatus("aborted")
self.clean(pyfile)
else:
self.queue.put(pyfile)
continue
else:
pyfile.setStatus("failed")
self.m.log.error("pycurl error %s: %s" % (code, msg))
if self.m.core.debug:
print_exc()
self.writeDebugReport(pyfile)
self.m.core.hookManager.downloadFailed(pyfile)
self.clean(pyfile)
continue
except SkipDownload, e:
pyfile.setStatus("skipped")
self.m.log.info(
_("Download skipped: %(name)s due to %(plugin)s") % {"name": pyfile.name, "plugin": e.message})
self.clean(pyfile)
self.m.core.files.checkPackageFinished(pyfile)
self.active = False
self.m.core.files.save()
continue
except Exception, e:
pyfile.setStatus("failed")
self.m.log.warning(_("Download failed: %(name)s | %(msg)s") % {"name": pyfile.name, "msg": str(e)})
pyfile.error = str(e)
if self.m.core.debug:
print_exc()
self.writeDebugReport(pyfile)
self.m.core.hookManager.downloadFailed(pyfile)
self.clean(pyfile)
continue
finally:
self.m.core.files.save()
pyfile.checkIfProcessed()
exc_clear()
self.m.log.info(_("Download finished: %s") % pyfile.name)
#pyfile.plugin.req.clean()
self.m.core.hookManager.downloadFinished(pyfile)
self.m.core.files.checkPackageFinished(pyfile)
self.active = False
pyfile.finishIfDone()
self.m.core.files.save()
def put(self, job):
"""assing job to thread"""
self.queue.put(job)
def stop(self):
"""stops the thread"""
self.put("quit")
class DecrypterThread(PluginThread):
"""thread for decrypting"""
#----------------------------------------------------------------------
def __init__(self, manager, pyfile):
"""constructor"""
PluginThread.__init__(self, manager)
self.active = pyfile
manager.localThreads.append(self)
pyfile.setStatus("decrypting")
self.start()
#----------------------------------------------------------------------
def run(self):
"""run method"""
pyfile = self.active
retry = False
try:
self.m.log.info(_("Decrypting starts: %s") % self.active.name)
self.active.plugin.preprocessing(self)
except NotImplementedError:
self.m.log.error(_("Plugin %s is missing a function.") % self.active.pluginname)
return
except Fail, e:
msg = e.args[0]
if msg == "offline":
self.active.setStatus("offline")
self.m.log.warning(_("Download is offline: %s") % self.active.name)
else:
self.active.setStatus("failed")
self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": msg})
self.active.error = msg
return
except Abort:
self.m.log.info(_("Download aborted: %s") % pyfile.name)
pyfile.setStatus("aborted")
return
except Retry:
self.m.log.info(_("Retrying %s") % self.active.name)
retry = True
return self.run()
except Exception, e:
self.active.setStatus("failed")
self.m.log.error(_("Decrypting failed: %(name)s | %(msg)s") % {"name": self.active.name, "msg": str(e)})
self.active.error = str(e)
if self.m.core.debug:
print_exc()
self.writeDebugReport(pyfile)
return
finally:
if not retry:
self.active.release()
self.active = False
self.m.core.files.save()
self.m.localThreads.remove(self)
exc_clear()
#self.m.core.hookManager.downloadFinished(pyfile)
#self.m.localThreads.remove(self)
#self.active.finishIfDone()
if not retry:
pyfile.delete()
class HookThread(PluginThread):
"""thread for hooks"""
#----------------------------------------------------------------------
def __init__(self, m, function, pyfile):
"""Constructor"""
PluginThread.__init__(self, m)
self.f = function
self.active = pyfile
m.localThreads.append(self)
if isinstance(pyfile, PyFile):
pyfile.setStatus("processing")
self.start()
def run(self):
self.f(self.active)
self.m.localThreads.remove(self)
if isinstance(self.active, PyFile):
self.active.finishIfDone()
class InfoThread(PluginThread):
def __init__(self, manager, data, pid=-1, rid=-1, add=False):
"""Constructor"""
PluginThread.__init__(self, manager)
self.data = data
self.pid = pid # package id
# [ .. (name, plugin) .. ]
self.rid = rid #result id
self.add = add #add packages instead of return result
self.cache = [] #accumulated data
self.start()
def run(self):
"""run method"""
plugins = {}
container = []
for url, plugin in self.data:
if plugin in plugins:
plugins[plugin].append(url)
else:
plugins[plugin] = [url]
# filter out container plugins
for name in self.m.core.pluginManager.containerPlugins:
if name in plugins:
container.extend([(name, url) for url in plugins[name]])
del plugins[name]
#directly write to database
if self.pid > -1:
for pluginname, urls in plugins.iteritems():
plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
if hasattr(plugin, "getInfo"):
self.fetchForPlugin(pluginname, plugin, urls, self.updateDB)
self.m.core.files.save()
elif self.add:
for pluginname, urls in plugins.iteritems():
plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
if hasattr(plugin, "getInfo"):
self.fetchForPlugin(pluginname, plugin, urls, self.updateCache, True)
else:
#generate default result
result = [(url, 0, 3, url) for url in urls]
self.updateCache(pluginname, result)
packs = parseNames([(name, url) for name, x, y, url in self.cache])
self.m.log.debug("Fetched and generated %d packages" % len(packs))
for k, v in packs:
self.m.core.api.addPackage(k, v)
#empty cache
del self.cache[:]
else: #post the results
for name, url in container:
#attach container content
try:
data = self.decryptContainer(name, url)
except:
print_exc()
self.m.log.error("Could not decrypt container.")
data = []
for url, plugin in data:
if plugin in plugins:
plugins[plugin].append(url)
else:
plugins[plugin] = [url]
self.m.infoResults[self.rid] = {}
for pluginname, urls in plugins.iteritems():
plugin = self.m.core.pluginManager.getPlugin(pluginname, True)
if hasattr(plugin, "getInfo"):
self.fetchForPlugin(pluginname, plugin, urls, self.updateResult, True)
#force to process cache
if self.cache:
self.updateResult(pluginname, [], True)
else:
#generate default result
result = [(url, 0, 3, url) for url in urls]
self.updateResult(pluginname, result, True)
self.m.infoResults[self.rid]["ALL_INFO_FETCHED"] = {}
self.m.timestamp = time() + 5 * 60
def updateDB(self, plugin, result):
self.m.core.files.updateFileInfo(result, self.pid)
def updateResult(self, plugin, result, force=False):
#parse package name and generate result
#accumulate results
self.cache.extend(result)
if len(self.cache) >= 20 or force:
#used for package generating
tmp = [(name, (url, OnlineStatus(name, plugin, "unknown", status, int(size))))
for name, size, status, url in self.cache]
data = parseNames(tmp)
result = {}
for k, v in data.iteritems():
for url, status in v:
status.packagename = k
result[url] = status
self.m.setInfoResults(self.rid, result)
self.cache = []
def updateCache(self, plugin, result):
self.cache.extend(result)
def fetchForPlugin(self, pluginname, plugin, urls, cb, err=None):
try:
result = [] #result loaded from cache
process = [] #urls to process
for url in urls:
if url in self.m.infoCache:
result.append(self.m.infoCache[url])
else:
process.append(url)
if result:
self.m.log.debug("Fetched %d values from cache for %s" % (len(result), pluginname))
cb(pluginname, result)
if process:
self.m.log.debug("Run Info Fetching for %s" % pluginname)
for result in plugin.getInfo(process):
#result = [ .. (name, size, status, url) .. ]
if not type(result) == list: result = [result]
for res in result:
self.m.infoCache[res[3]] = res
cb(pluginname, result)
self.m.log.debug("Finished Info Fetching for %s" % pluginname)
except Exception, e:
self.m.log.warning(_("Info Fetching for %(name)s failed | %(err)s") %
{"name": pluginname, "err": str(e)})
if self.m.core.debug:
print_exc()
# generate default results
if err:
result = [(url, 0, 3, url) for url in urls]
cb(pluginname, result)
def decryptContainer(self, plugin, url):
data = []
# only works on container plugins
self.m.log.debug("Pre decrypting %s with %s" % (url, plugin))
# dummy pyfile
pyfile = PyFile(self.m.core.files, -1, url, url, 0, 0, "", plugin, -1, -1)
pyfile.initPlugin()
# little plugin lifecycle
try:
pyfile.plugin.setup()
pyfile.plugin.loadToDisk()
pyfile.plugin.decrypt(pyfile)
pyfile.plugin.deleteTmp()
for pack in pyfile.plugin.packages:
pyfile.plugin.urls.extend(pack[1])
data = self.m.core.pluginManager.parseUrls(pyfile.plugin.urls)
self.m.log.debug("Got %d links." % len(data))
except Exception, e:
self.m.log.debug("Pre decrypting error: %s" % str(e))
finally:
pyfile.release()
return data