#!/usr/bin/env python # -*- coding: utf-8 -*- ############################################################################### # Copyright(c) 2008-2012 pyLoad Team # http://www.pyload.org # # This file is part of pyLoad. # pyLoad is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # Subjected to the terms and conditions in LICENSE # # @author: RaNaN ############################################################################### from os import remove from os.path import dirname from time import time from shutil import move import pycurl from pyload.plugins.Base import Abort from pyload.network.CookieJar import CookieJar from pyload.utils.fs import save_join, fs_encode from ..Download import Download from CurlChunk import ChunkInfo, CurlChunk from CurlRequest import ResponseException # TODO: save content-disposition for resuming class CurlDownload(Download): """ loads an url, http + ftp supported """ # def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None, # options={}, disposition=False): CONTEXT_CLASS = CookieJar def __init__(self, *args, **kwargs): Download.__init__(self, *args, **kwargs) self.path = None self.disposition = False self.chunks = [] self.chunkSupport = None self.m = pycurl.CurlMulti() #needed for speed calculation self.lastArrived = [] self.speeds = [] self.lastSpeeds = [0, 0] @property def speed(self): last = [sum(x) for x in self.lastSpeeds if x] return (sum(self.speeds) + sum(last)) / (1 + len(last)) @property def arrived(self): return sum(c.arrived for c in self.chunks) if self.chunks else self._size @property def name(self): return self._name if self.disposition else None def _copyChunks(self): init = fs_encode(self.info.getChunkName(0)) #initial chunk name if self.info.getCount() > 1: fo = open(init, "rb+") #first chunkfile for i in range(1, self.info.getCount()): #input file fo.seek( self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks fname = fs_encode("%s.chunk%d" % (self.path, i)) fi = open(fname, "rb") buf = 32 * 1024 while True: #copy in chunks, consumes less memory data = fi.read(buf) if not data: break fo.write(data) fi.close() if fo.tell() < self.info.getChunkRange(i)[1]: fo.close() remove(init) self.info.remove() #there are probably invalid chunks raise Exception("Downloaded content was smaller than expected. Try to reduce download connections.") remove(fname) #remove chunk fo.close() if self.name: self.filename = save_join(dirname(self.path), self.name) move(init, fs_encode(self.path)) self.info.remove() #remove info file def checkResume(self): try: self.info = ChunkInfo.load(self.path) self.info.resume = True #resume is only possible with valid info file self._size = self.info.size self.infoSaved = True except IOError: self.info = ChunkInfo(self.path) def download(self, uri, path, get={}, post={}, referer=True, disposition=False, chunks=1, resume=False, cookies=True): """ returns new filename or None """ self.url = uri self.path = path self.disposition = disposition self.get = get self.post = post self.referer = referer self.cookies = cookies self.checkResume() chunks = max(1, chunks) resume = self.info.resume and resume try: self._download(chunks, resume) except pycurl.error, e: #code 33 - no resume code = e.args[0] if code == 33: # try again without resume self.log.debug("Errno 33 -> Restart without resume") #remove old handles for chunk in self.chunks: self.closeChunk(chunk) return self._download(chunks, False) else: raise finally: self.close() return self.name def _download(self, chunks, resume): if not resume: self.info.clear() self.info.addChunk("%s.chunk0" % self.path, (0, 0)) #create an initial entry self.chunks = [] init = CurlChunk(0, self, None, resume) #initial chunk that will load complete file (if needed) self.chunks.append(init) self.m.add_handle(init.getHandle()) lastFinishCheck = 0 lastTimeCheck = 0 chunksDone = set() # list of curl handles that are finished chunksCreated = False done = False if self.info.getCount() > 1: # This is a resume, if we were chunked originally assume still can self.chunkSupport = True while 1: #need to create chunks if not chunksCreated and self.chunkSupport and self.size: #will be set later by first chunk if not resume: self.info.setSize(self.size) self.info.createChunks(chunks) self.info.save() chunks = self.info.getCount() init.setRange(self.info.getChunkRange(0)) for i in range(1, chunks): c = CurlChunk(i, self, self.info.getChunkRange(i), resume) handle = c.getHandle() if handle: self.chunks.append(c) self.m.add_handle(handle) else: #close immediately self.log.debug("Invalid curl handle -> closed") c.close() chunksCreated = True while 1: ret, num_handles = self.m.perform() if ret != pycurl.E_CALL_MULTI_PERFORM: break t = time() # reduce these calls # when num_q is 0, the loop is exited while lastFinishCheck + 0.5 < t: # list of failed curl handles failed = [] ex = None # save only last exception, we can only raise one anyway num_q, ok_list, err_list = self.m.info_read() for c in ok_list: chunk = self.findChunk(c) try: # check if the header implies success, else add it to failed list chunk.verifyHeader() except ResponseException, e: self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e))) failed.append(chunk) ex = e else: chunksDone.add(c) for c in err_list: curl, errno, msg = c chunk = self.findChunk(curl) #test if chunk was finished if errno != 23 or "0 !=" not in msg: failed.append(chunk) ex = pycurl.error(errno, msg) self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(ex))) continue try: # check if the header implies success, else add it to failed list chunk.verifyHeader() except ResponseException, e: self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e))) failed.append(chunk) ex = e else: chunksDone.add(curl) if not num_q: # no more info to get # check if init is not finished so we reset download connections # note that other chunks are closed and everything downloaded with initial connection if failed and init not in failed and init.c not in chunksDone: self.log.error(_("Download chunks failed, fallback to single connection | %s" % (str(ex)))) #list of chunks to clean and remove to_clean = filter(lambda x: x is not init, self.chunks) for chunk in to_clean: self.closeChunk(chunk) self.chunks.remove(chunk) remove(fs_encode(self.info.getChunkName(chunk.id))) #let first chunk load the rest and update the info file init.resetRange() self.info.clear() self.info.addChunk("%s.chunk0" % self.filename, (0, self.size)) self.info.save() elif failed: raise ex lastFinishCheck = t if len(chunksDone) >= len(self.chunks): if len(chunksDone) > len(self.chunks): self.log.warning("Finished download chunks size incorrect, please report bug.") done = True #all chunks loaded break if done: break #all chunks loaded # calc speed once per second, averaging over 3 seconds if lastTimeCheck + 1 < t: diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in enumerate(self.chunks)] self.lastSpeeds[1] = self.lastSpeeds[0] self.lastSpeeds[0] = self.speeds self.speeds = [float(a) / (t - lastTimeCheck) for a in diff] self.lastArrived = [c.arrived for c in self.chunks] lastTimeCheck = t if self.doAbort: raise Abort() self.m.select(1) for chunk in self.chunks: chunk.flushFile() #make sure downloads are written to disk self._copyChunks() def findChunk(self, handle): """ linear search to find a chunk (should be ok since chunk size is usually low) """ for chunk in self.chunks: if chunk.c == handle: return chunk def closeChunk(self, chunk): try: self.m.remove_handle(chunk.c) except pycurl.error, e: self.log.debug("Error removing chunk: %s" % str(e)) finally: chunk.close() def close(self): """ cleanup """ for chunk in self.chunks: self.closeChunk(chunk) else: #Workaround: pycurl segfaults when closing multi, that never had any curl handles if hasattr(self, "m"): c = pycurl.Curl() self.m.add_handle(c) self.m.remove_handle(c) c.close() self.chunks = [] if hasattr(self, "m"): self.m.close() del self.m if hasattr(self, "info"): del self.info