diff options
Diffstat (limited to 'module/network/HTTPDownload.py')
| -rw-r--r-- | module/network/HTTPDownload.py | 475 | 
1 files changed, 173 insertions, 302 deletions
| diff --git a/module/network/HTTPDownload.py b/module/network/HTTPDownload.py index bce698e1e..e3ac09e84 100644 --- a/module/network/HTTPDownload.py +++ b/module/network/HTTPDownload.py @@ -14,337 +14,208 @@      You should have received a copy of the GNU General Public License      along with this program; if not, see <http://www.gnu.org/licenses/>. -    @author: mkaay +    @author: RaNaN  """ -from HTTPChunk import HTTPChunk -from helper import * -from os.path import exists, getsize  from os import remove -#from shutil import move, copyfileobj -from zlib import decompressobj, MAX_WBITS +from time import sleep, time +from shutil import move -from cookielib import CookieJar +import pycurl -class WrongFormat(Exception): -    pass +from HTTPRequest import HTTPRequest +from HTTPChunk import ChunkInfo, HTTPChunk -class ChunkInfo(): -    def __init__(self, name): -        self.name = name -        self.size = None -        self.loaded = False -        self.chunks = [] -     -    def setSize(self, size): -        self.size = int(size) -     -    def addChunk(self, name, range, encoding): -        self.chunks.append((name, range, encoding)) -     -    def clear(self): -        self.chunks = [] -        self.loaded = False -     -    def save(self): -        fh = open("%s.chunks" % self.name, "w") -        fh.write("name:%s\n" % self.name) -        fh.write("size:%s\n" % self.size) -        for i, c in enumerate(self.chunks): -            fh.write("#%d:\n" % i) -            fh.write("\tname:%s\n" % c[0]) -            fh.write("\tencoding:%s\n" % c[2]) -            fh.write("\trange:%i-%i\n" % c[1]) -     -    @staticmethod -    def load(name): -        if not exists("%s.chunks" % name): -            raise IOError() -        fh = open("%s.chunks" % name, "r") -        name = fh.readline()[:-1] -        size = fh.readline()[:-1] -        if name.startswith("name:") and size.startswith("size:"): -            name = name[5:] -            size = size[5:] -        else: -            raise WrongFormat() -        ci = ChunkInfo(name) -        ci.loaded = True -        ci.setSize(size) -        while True: -            if not fh.readline(): #skip line -                break -            name = fh.readline()[1:-1] -            encoding = fh.readline()[1:-1] -            range = fh.readline()[1:-1] -            if name.startswith("name:") and encoding.startswith("encoding:") and range.startswith("range:"): -                name = name[5:] -                encoding = encoding[9:] -                range = range[6:].split("-") -            else: -                raise WrongFormat() -             -            ci.addChunk(name, (long(range[0]), long(range[1])), encoding) -        return ci -     -    def removeInfo(self): -        remove("%s.chunks" % self.name) -     -    def getCount(self): -        return len(self.chunks) -     -    def getChunkName(self, index): -        return self.chunks[index][0] -     -    def getChunkRange(self, index): -        return self.chunks[index][1] -         -    def getChunkEncoding(self, index): -        return self.chunks[index][2] - -class WrappedHTTPDeferred(WrappedDeferred): -    pass -     -class HTTPDownload(): -    def __init__(self, url, filename, get={}, post={}, referer=None, cookies=True, customHeaders={}, bucket=None, interface=None, proxies={}): +from module.plugins.Plugin import Abort + +class HTTPDownload(HTTPRequest): +    def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None, +                 interface=None, proxies={}):          self.url = url -        self.filename = filename -        self.interface = interface -        self.proxies = proxies -         +        self.filename = filename  #complete file destination, not only name          self.get = get          self.post = post -                  self.referer = referer -        self.cookies = cookies -         -        self.customHeaders = customHeaders -         +        self.cj = cj  #cookiejar if cookies are needed          self.bucket = bucket -         -        self.deferred = Deferred() -         -        self.finished = False -        self._abort = False -        self.size = None -         -        self.cookieJar = CookieJar() -         +        self.interface = interface +        self.proxies = proxies +        # all arguments + +        self.abort = False +        self.size = 0 +          self.chunks = []          self.chunksDone = 0 +          try:              self.info = ChunkInfo.load(filename) +            self.info.resume = True #resume is only possible with valid info file +            self.size = self.info.size          except IOError:              self.info = ChunkInfo(filename) -        self.noChunkSupport = False -     + +        self.chunkSupport = None +        self.m = pycurl.CurlMulti() + +        #needed for speed calculation +        self.lastChecked = 0 +        self.lastArrived = [] +        self.speeds = [] + + +    @property +    def speed(self): +        return sum(self.speeds) +      @property      def arrived(self): -        arrived = 0 -        try: -            for i in range(self.info.getCount()): -                arrived += getsize(self.info.getChunkName(i)) #ugly, but difficult to calc otherwise due chunk resume -        except OSError: -            arrived = self.size -        return arrived -     -    def setAbort(self, val): -        self._abort = val -        for chunk in self.chunks: -            chunk.abort = val -     -    def getAbort(self): -        return self._abort -     -    abort = property(getAbort, setAbort) -     -    def getSpeed(self): -        speed = 0 -        for chunk in self.chunks: -            speed += chunk.getSpeed() -        return speed -     +        return sum([c.arrived for c in self.chunks]) +      @property -    def speed(self): -        return self.getSpeed() -     -    def calcProgress(self, p): -        self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100)) -     -    def _chunkDone(self): -        self.chunksDone += 1 -        #print self.chunksDone, "/", len(self.chunks) -        if self.chunksDone == len(self.chunks): -            self._copyChunks() -     +    def percent(self): +        if not self.size: return 0 +        return (self.arrived * 100) / self.size +      def _copyChunks(self): -        fo = open(self.filename, "wb") #out file -        for i in range(self.info.getCount()): -            encoding = self.info.getChunkEncoding(i) -             -            #decompress method, if any -            decompress = lambda data: data -            if encoding == "gzip": -                gz = decompressobj(16+MAX_WBITS) -                decompress = lambda data: gz.decompress(data) -            if encoding == "deflate": -                df = decompressobj(-MAX_WBITS) -                decompress = lambda data: df.decompress(data) -             -            #input file -            fname = "%s.chunk%d" % (self.filename, i) -            fi = open(fname, "rb") -            while True: #copy in chunks, consumes less memory -                data = fi.read(512*1024) -                if not data: -                    break -                fo.write(decompress(data)) #decompressing -            fi.close() -            remove(fname) #remove -        fo.close() -        self.info.removeInfo() #remove info file -        self.deferred.callback() #done, emit callbacks -     -    def _createChunk(self, fh, range=None): -        chunk = HTTPChunk(self.url, fh, get=self.get, post=self.post, -                          referer=self.referer, cookies=self.cookies, -                          customHeaders=self.customHeaders, -                          bucket=self.bucket, range=range, -                          interface=self.interface, proxies=self.proxies) -        chunk.cookieJar = self.cookieJar -        return chunk -     -    def _addChunk(self, chunk, d): -        self.chunks.append(chunk) -        d.addProgress("percent", self.calcProgress) -        d.addCallback(self._chunkDone) -        d.addErrback(lambda *args, **kwargs: self.setAbort(True)) -        d.addErrback(self.deferred.error) -     +        init = self.info.getChunkName(0) #initial chunk name + +        if len(self.chunks) > 1: +            fo = open(init, "rb+") #first chunkfile +            for i in range(1, self.info.getCount()): +                #input file +                fo.seek(self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks +                fname = "%s.chunk%d" % (self.filename, i) +                fi = open(fname, "rb") +                buf = 512 * 1024 +                while True: #copy in chunks, consumes less memory +                    data = fi.read(buf) +                    if not data: +                        break +                    fo.write(data) +                fi.close() +                remove(fname) #remove chunk +            fo.close() + +        move(init, self.filename) +        self.info.remove() #remove info file +      def download(self, chunks=1, resume=False): -        self.chunksDone = 0 -        if chunks > 0: -            #diffentent chunk count in info, resetting -            if self.info.loaded and not self.info.getCount() == chunks: -                self.info.clear() -             -            #if resuming, calculate range with offset -            crange = None -            if resume: -                if self.info.getCount() == chunks and exists("%s.chunk0" % (self.filename, )): -                    crange = self.info.getChunkRange(0) -                    crange = (crange[0]+getsize("%s.chunk0" % (self.filename, )), crange[1]-1) -             -            #if firstpart not done -            if crange is None or crange[1]-crange[0] > 0: -                fh = open("%s.chunk0" % (self.filename, ), "ab" if crange else "wb") -                 -                chunk = self._createChunk(fh, range=crange) -                 -                d = chunk.download() #start downloading -                self._addChunk(chunk, d) -                 -                #no info file, need to calculate ranges -                if not self.info.loaded: -                    size = chunk.size #overall size -                    chunksize = size/chunks #chunk size -                     -                    chunk.range = (0, chunksize) #setting range for first part -                    chunk.noRangeHeader = True -                    chunk.size = chunksize #setting size for first chunk -                     -                    self.size = size #setting overall size -                    self.info.setSize(self.size) #saving overall size -                    self.info.addChunk("%s.chunk0" % (self.filename, ), chunk.range, chunk.getEncoding()) #add chunk to infofile +        chunks = max(1, chunks) +        resume = self.info.resume and resume +        self.chunks = [] + +        try: +            self._download(chunks, resume) +        finally: +            self.clean() + +    def _download(self, chunks, resume): +        if not resume: +            self.info.addChunk("%s.chunk0" % self.filename, (0, 0)) + +        init = HTTPChunk(0, self, None, resume) #initial chunk that will load complete file (if needed) + +        self.chunks.append(init) +        self.m.add_handle(init.getHandle()) + +        while 1: +            #need to create chunks +            if len(self.chunks) < chunks and self.chunkSupport and self.size: #will be set later by first chunk + +                if not resume: +                    self.info.setSize(self.size) +                    self.info.createChunks(chunks) +                    self.info.save() + +                chunks = self.info.getCount() + +                init.range = self.info.getChunkRange(0) + +                for i in range(1, chunks): +                    c = HTTPChunk(i, self, self.info.getChunkRange(i), resume) +                    self.chunks.append(c) +                    self.m.add_handle(c.getHandle()) + +            while 1: +                ret, num_handles = self.m.perform() + +                if ret != pycurl.E_CALL_MULTI_PERFORM: +                    break + +            while 1: +                num_q, ok_list, err_list = self.m.info_read() +                for c in ok_list: +                    self.chunksDone += 1 +                for c in err_list: +                    curl, errno, msg = c +                    #test if chunk was finished, otherwise raise the exception +                    if errno != 23 or "0 !=" not in msg: +                        raise pycurl.error(errno, msg) + +                    #@TODO KeyBoardInterrupts are seen as finished chunks, +                    #but normally not handled to this process, only in the testcase -                    lastchunk = size - chunksize*(chunks-1) #calculating size for last chunk -                self.firstchunk = chunk #remeber first chunk -             -            if self.info.loaded and not self.size: -                self.size = self.info.size #setting overall size -             -            for i in range(1, chunks): #other chunks -                cont = False -                if not self.info.loaded: #first time load -                    if i+1 == chunks: #last chunk? -                        rng = (i*chunksize, i*chunksize+lastchunk-1) -                    else: -                        rng = (i*chunksize, (i+1)*chunksize-1) #adjusting range -                else: #info available -                    rng = self.info.getChunkRange(i) #loading previous range -                    if resume and exists("%s.chunk%d" % (self.filename, i)): #continue chunk -                        rng = (rng[0]+getsize("%s.chunk%d" % (self.filename, i)), rng[1]) #adjusting offset -                        cont = True #set append mode -                 -                if rng[1]-rng[0] <= 0: #chunk done -                    continue -                 -                fh = open("%s.chunk%d" % (self.filename, i), "ab" if cont else "wb") -                chunk = self._createChunk(fh, range=rng) -                d = chunk.download() #try -                 -                if not chunk.resp.getcode() == 206 and i == 1: #no range supported, tell first chunk to download everything -                    chunk.abort = True -                    self.noChunkSupport = True -                    self.firstchunk.size = self.size -                    self.firstchunk.range = None -                    self.info.clear() #clear info -                    self.info.addChunk("%s.chunk0" % (self.filename, ), (0, self.firstchunk.size), chunk.getEncoding()) #re-adding info with correct ranges +                    self.chunksDone += 1 +                if not num_q:                      break -                 -                self._addChunk(chunk, d) -                 -                if not self.info.loaded: #adding info -                    self.info.addChunk("%s.chunk%d" % (self.filename, i), chunk.range, chunk.getEncoding()) -             -            self.info.save() #saving info -            if not len(self.chunks): -                self._copyChunks() -            return WrappedHTTPDeferred(self, self.deferred) -        else: -            raise Exception("no chunks") + +            if self.chunksDone == len(self.chunks): +                break #all chunks loaded + +            # calc speed once per second +            t = time() +            if self.lastChecked + 1 < t: +                diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in +                        enumerate(self.chunks)] + +                #for i, c in enumerate(self.chunks): +                #    diff[i] = c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) + +                self.speeds = [float(a) / (t - self.lastChecked) for a in diff] +                self.lastArrived = [c.arrived for c in self.chunks] +                self.lastChecked = t +                #print "------------------------" +                #print self.speed / 1024, "kb/s" +                #print "Arrived:", self.arrived +                #print "Size:", self.size +                #print self.percent, "%" + +            if self.abort: +                raise Abort() + +            sleep(0.001) #supress busy waiting - limits dl speed to  (1 / x) * buffersize +            self.m.select(1) + +        for chunk in self.chunks: +            chunk.fp.close() +            self.m.remove_handle(chunk.c) + +        self._copyChunks()      def clean(self):          """ cleanup """ -        for c in self.chunks: -            c.clean() +        for chunk in self.chunks: +                chunk.close() +                self.m.remove_handle(chunk.c) + +        self.m.close() +        self.chunks = [] +        if hasattr(self, "cj"): +            del self.cj +        if hasattr(self, "info"): +            del self.info  if __name__ == "__main__": -    import sys +    url = "http://speedtest.netcologne.de/test_10mb.bin" +      from Bucket import Bucket +      bucket = Bucket() -    bucket.setRate(200*1024) -    #bucket = None -     -    url = "http://speedtest.netcologne.de/test_100mb.bin" -     -    finished = False -    def err(*a, **b): -        print a, b -    def callb(*a, **b): -        global finished -        finished = True -        print a, b -     +    bucket.setRate(200 * 1024) +    bucket = None +      print "starting" -     -    dwnld = HTTPDownload(url, "test_100mb.bin", bucket=bucket) -    d = dwnld.download(chunks=5, resume=True) -    d.addCallback(callb) -    d.addErrback(err) -     -    try: -        while True: -            for a, chunk in enumerate(dwnld.chunks): -                if not chunk.finished: -                    print "#%d" % a, chunk.getSpeed()/1024, "kb/s", "size", int(float(chunk.arrived)/chunk.size*100), "%" -                else: -                    print "#%d" % a, "finished" -            print "sum", dwnld.speed/1024, dwnld.arrived, "/", dwnld.size, int(float(dwnld.arrived)/dwnld.size*100), "%" -            if finished: -                print "- finished" -                break -            sleep(1) -    except KeyboardInterrupt: -        dwnld.abort = True -        sys.exit() + +    dwnld = HTTPDownload(url, "test_10mb.bin", bucket=bucket) +    dwnld.download(chunks=3, resume=True)
\ No newline at end of file | 
