diff options
Diffstat (limited to 'pyload/plugins/network/CurlDownload.py')
-rw-r--r-- | pyload/plugins/network/CurlDownload.py | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/pyload/plugins/network/CurlDownload.py b/pyload/plugins/network/CurlDownload.py new file mode 100644 index 000000000..5de83ec7b --- /dev/null +++ b/pyload/plugins/network/CurlDownload.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +############################################################################### +# Copyright(c) 2008-2012 pyLoad Team +# http://www.pyload.org +# +# This file is part of pyLoad. +# pyLoad is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Subjected to the terms and conditions in LICENSE +# +# @author: RaNaN +############################################################################### + +from os import remove +from os.path import dirname +from time import time +from shutil import move + +import pycurl + +from pyload.plugins.Base import Abort +from pyload.utils.fs import save_join, fs_encode + +from ..Download import Download +from CurlChunk import ChunkInfo, CurlChunk +from CurlRequest import ResponseException + +# TODO: save content-disposition for resuming + +class CurlDownload(Download): + """ loads an url, http + ftp supported """ + + # def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None, + # options={}, disposition=False): + + def __init__(self, *args, **kwargs): + Download.__init__(self, *args, **kwargs) + + self.path = None + self.disposition = False + + self.chunks = [] + self.chunkSupport = None + + self.m = pycurl.CurlMulti() + + #needed for speed calculation + self.lastArrived = [] + self.speeds = [] + self.lastSpeeds = [0, 0] + + @property + def speed(self): + last = [sum(x) for x in self.lastSpeeds if x] + return (sum(self.speeds) + sum(last)) / (1 + len(last)) + + @property + def arrived(self): + return sum(c.arrived for c in self.chunks) if self.chunks else self._size + + @property + def name(self): + return self._name if self.disposition else None + + def _copyChunks(self): + init = fs_encode(self.info.getChunkName(0)) #initial chunk name + + if self.info.getCount() > 1: + fo = open(init, "rb+") #first chunkfile + for i in range(1, self.info.getCount()): + #input file + fo.seek( + self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks + fname = fs_encode("%s.chunk%d" % (self.path, i)) + fi = open(fname, "rb") + buf = 32 * 1024 + while True: #copy in chunks, consumes less memory + data = fi.read(buf) + if not data: + break + fo.write(data) + fi.close() + if fo.tell() < self.info.getChunkRange(i)[1]: + fo.close() + remove(init) + self.info.remove() #there are probably invalid chunks + raise Exception("Downloaded content was smaller than expected. Try to reduce download connections.") + remove(fname) #remove chunk + fo.close() + + if self.name: + self.filename = save_join(dirname(self.path), self.name) + + move(init, fs_encode(self.path)) + self.info.remove() #remove info file + + def checkResume(self): + try: + self.info = ChunkInfo.load(self.path) + self.info.resume = True #resume is only possible with valid info file + self._size = self.info.size + self.infoSaved = True + except IOError: + self.info = ChunkInfo(self.path) + + def download(self, uri, path, get={}, post={}, referer=True, disposition=False, chunks=1, resume=False): + """ returns new filename or None """ + self.url = uri + self.path = path + self.disposition = disposition + self.get = get + self.post = post + self.referer = referer + + self.checkResume() + chunks = max(1, chunks) + resume = self.info.resume and resume + + try: + self._download(chunks, resume) + except pycurl.error, e: + #code 33 - no resume + code = e.args[0] + if code == 33: + # try again without resume + self.log.debug("Errno 33 -> Restart without resume") + + #remove old handles + for chunk in self.chunks: + self.closeChunk(chunk) + + return self._download(chunks, False) + else: + raise + finally: + self.close() + + return self.name + + def _download(self, chunks, resume): + if not resume: + self.info.clear() + self.info.addChunk("%s.chunk0" % self.path, (0, 0)) #create an initial entry + + self.chunks = [] + + init = CurlChunk(0, self, None, resume) #initial chunk that will load complete file (if needed) + + self.chunks.append(init) + self.m.add_handle(init.getHandle()) + + lastFinishCheck = 0 + lastTimeCheck = 0 + chunksDone = set() # list of curl handles that are finished + chunksCreated = False + done = False + if self.info.getCount() > 1: # This is a resume, if we were chunked originally assume still can + self.chunkSupport = True + + while 1: + #need to create chunks + if not chunksCreated and self.chunkSupport and self.size: #will be set later by first chunk + + if not resume: + self.info.setSize(self.size) + self.info.createChunks(chunks) + self.info.save() + + chunks = self.info.getCount() + + init.setRange(self.info.getChunkRange(0)) + + for i in range(1, chunks): + c = CurlChunk(i, self, self.info.getChunkRange(i), resume) + + handle = c.getHandle() + if handle: + self.chunks.append(c) + self.m.add_handle(handle) + else: + #close immediately + self.log.debug("Invalid curl handle -> closed") + c.close() + + chunksCreated = True + + while 1: + ret, num_handles = self.m.perform() + if ret != pycurl.E_CALL_MULTI_PERFORM: + break + + t = time() + + # reduce these calls + # when num_q is 0, the loop is exited + while lastFinishCheck + 0.5 < t: + # list of failed curl handles + failed = [] + ex = None # save only last exception, we can only raise one anyway + + num_q, ok_list, err_list = self.m.info_read() + for c in ok_list: + chunk = self.findChunk(c) + try: # check if the header implies success, else add it to failed list + chunk.verifyHeader() + except ResponseException, e: + self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e))) + failed.append(chunk) + ex = e + else: + chunksDone.add(c) + + for c in err_list: + curl, errno, msg = c + chunk = self.findChunk(curl) + #test if chunk was finished + if errno != 23 or "0 !=" not in msg: + failed.append(chunk) + ex = pycurl.error(errno, msg) + self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(ex))) + continue + + try: # check if the header implies success, else add it to failed list + chunk.verifyHeader() + except ResponseException, e: + self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e))) + failed.append(chunk) + ex = e + else: + chunksDone.add(curl) + if not num_q: # no more info to get + + # check if init is not finished so we reset download connections + # note that other chunks are closed and everything downloaded with initial connection + if failed and init not in failed and init.c not in chunksDone: + self.log.error(_("Download chunks failed, fallback to single connection | %s" % (str(ex)))) + + #list of chunks to clean and remove + to_clean = filter(lambda x: x is not init, self.chunks) + for chunk in to_clean: + self.closeChunk(chunk) + self.chunks.remove(chunk) + remove(fs_encode(self.info.getChunkName(chunk.id))) + + #let first chunk load the rest and update the info file + init.resetRange() + self.info.clear() + self.info.addChunk("%s.chunk0" % self.filename, (0, self.size)) + self.info.save() + elif failed: + raise ex + + lastFinishCheck = t + + if len(chunksDone) >= len(self.chunks): + if len(chunksDone) > len(self.chunks): + self.log.warning("Finished download chunks size incorrect, please report bug.") + done = True #all chunks loaded + + break + + if done: + break #all chunks loaded + + # calc speed once per second, averaging over 3 seconds + if lastTimeCheck + 1 < t: + diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in + enumerate(self.chunks)] + + self.lastSpeeds[1] = self.lastSpeeds[0] + self.lastSpeeds[0] = self.speeds + self.speeds = [float(a) / (t - lastTimeCheck) for a in diff] + self.lastArrived = [c.arrived for c in self.chunks] + lastTimeCheck = t + + if self.doAbort: + raise Abort() + + self.m.select(1) + + for chunk in self.chunks: + chunk.flushFile() #make sure downloads are written to disk + + self._copyChunks() + + def findChunk(self, handle): + """ linear search to find a chunk (should be ok since chunk size is usually low) """ + for chunk in self.chunks: + if chunk.c == handle: return chunk + + def closeChunk(self, chunk): + try: + self.m.remove_handle(chunk.c) + except pycurl.error, e: + self.log.debug("Error removing chunk: %s" % str(e)) + finally: + chunk.close() + + def close(self): + """ cleanup """ + for chunk in self.chunks: + self.closeChunk(chunk) + else: + #Workaround: pycurl segfaults when closing multi, that never had any curl handles + if hasattr(self, "m"): + c = pycurl.Curl() + self.m.add_handle(c) + self.m.remove_handle(c) + c.close() + + self.chunks = [] + if hasattr(self, "m"): + self.m.close() + del self.m + if hasattr(self, "cj"): + del self.cj + if hasattr(self, "info"): + del self.info
\ No newline at end of file |