diff options
author | RaNaN <Mast3rRaNaN@hotmail.de> | 2010-12-27 21:18:29 +0100 |
---|---|---|
committer | RaNaN <Mast3rRaNaN@hotmail.de> | 2010-12-27 21:18:29 +0100 |
commit | 9509a6444bbb538e136ed899d94aab32be629383 (patch) | |
tree | ac8532b20912a3e5be6ff73443520a7f31f5806a /module/network/HTTPChunk.py | |
parent | encoding fix (diff) | |
download | pyload-9509a6444bbb538e136ed899d94aab32be629383.tar.xz |
new curl download backend - support for chunked dl, resume
Diffstat (limited to 'module/network/HTTPChunk.py')
-rw-r--r-- | module/network/HTTPChunk.py | 353 |
1 files changed, 176 insertions, 177 deletions
diff --git a/module/network/HTTPChunk.py b/module/network/HTTPChunk.py index 02134ca63..0c184db94 100644 --- a/module/network/HTTPChunk.py +++ b/module/network/HTTPChunk.py @@ -14,185 +14,184 @@ You should have received a copy of the GNU General Public License along with this program; if not, see <http://www.gnu.org/licenses/>. - @author: mkaay + @author: RaNaN """ - -from HTTPBase import HTTPBase -from urllib2 import HTTPError -from helper import * +from os import remove +from os.path import exists from time import sleep -from traceback import print_exc -from module.plugins.Plugin import Abort -from module.plugins.Plugin import Fail - -class HTTPChunk(HTTPBase): - def __init__(self, url, fh, get={}, post={}, referer=None, cookies=True, customHeaders={}, range=None, bucket=None, interface=None, proxies={}): - HTTPBase.__init__(self, interface=interface, proxies=proxies) - - self.url = url - self.bucket = bucket - self.range = range - self.noRangeHeader = False - self.fh = fh - - self.get = get - self.post = post - self.referer = referer - self.cookies = cookies - self.customHeaders = customHeaders - - self.deferred = Deferred() - - self.abort = False - self.finished = False - - self.arrived = 0 - - self.startTime = None - self.endTime = None - - self.speed = 0 #byte/sec - self.speedCalcTime = None - self.speedCalcLen = 0 - - self.bufferSize = 18*1024 #tune if performance is poor - self.resp = None - - def getSpeed(self): - return self.speed - - @threaded - def _download(self, resp): - self.arrived = 0 - self.lastSpeed = self.startTime = inttime() - - if self.noRangeHeader and not self.range[0] == 0: - self.deferred.error("range starts not at 0") - - running = True - while running: - if self.abort: - break - count = self.bufferSize - if self.noRangeHeader: - count = min(count, self.range[1] - self.arrived) - if self.bucket: - count = self.bucket.add(count) - if not count: - sleep(0.01) - continue - - try: - data = resp.read(count) - except: - self.deferred.error(Fail, "timeout") - break - - if self.speedCalcTime < inttime(): - self.speed = self.speedCalcLen - self.speedCalcTime = inttime() - self.speedCalcLen = 0 - try: - self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100)) - except: - pass - size = len(data) - - self.arrived += size - self.speedCalcLen += size - - if self.noRangeHeader and self.arrived == self.range[1]: - running = False - - if size: - self.fh.write(data) - else: - break - - self.speed = 0 - self.endTime = inttime() - self.finished = True - self.fh.close() - - if self.abort: - self.deferred.error(Abort) - elif self.size == self.arrived: - self.deferred.callback() - else: - print self.arrived, self.size - self.deferred.error(Fail, "wrong content-length") - - def getEncoding(self): - try: - if self.resp.info()["Content-Encoding"] in ("gzip", "deflate"): - return self.resp.info()["Content-Encoding"] - except: - pass - return "plain" - - def download(self): - if self.range: - self.customHeaders["Range"] = "bytes=%i-%i" % self.range - try: - resp = self.getResponse(self.url, self.get, self.post, self.referer, self.cookies, self.customHeaders) - self.resp = resp - except HTTPError, e: - print_exc() - self.deferred.error(e) - return self.deferred - - if resp.getcode() in (200, 206): - self._download(resp) + +import pycurl + +from HTTPRequest import HTTPRequest + +class WrongFormat(Exception): + pass + +class ChunkInfo(): + def __init__(self, name): + self.name = name + self.size = 0 + self.resume = False + self.chunks = [] + + def setSize(self, size): + self.size = int(size) + + def addChunk(self, name, range): + self.chunks.append((name, range)) + + def clear(self): + self.chunks = [] + + def createChunks(self, chunks): + self.clear() + chunk_size = self.size / chunks + + current = 0 + for i in range(chunks): + end = self.size-1 if (i == chunks-1) else current+chunk_size + self.addChunk("%s.chunk%s" % (self.name, i), (current, end)) + current += chunk_size + 1 + + + def save(self): + fh = open("%s.chunks" % self.name, "w") + fh.write("name:%s\n" % self.name) + fh.write("size:%s\n" % self.size) + for i, c in enumerate(self.chunks): + fh.write("#%d:\n" % i) + fh.write("\tname:%s\n" % c[0]) + fh.write("\trange:%i-%i\n" % c[1]) + fh.close() + + @staticmethod + def load(name): + if not exists("%s.chunks" % name): + raise IOError() + fh = open("%s.chunks" % name, "r") + name = fh.readline()[:-1] + size = fh.readline()[:-1] + if name.startswith("name:") and size.startswith("size:"): + name = name[5:] + size = size[5:] else: - self.deferred.error(resp.getcode(), resp) - return self.deferred - -if __name__ == "__main__": - import sys - from Bucket import Bucket - bucket = Bucket() - bucket.setRate(200*1000) - #bucket = None - - url = "http://speedtest.netcologne.de/test_100mb.bin" - - finished = 0 - def err(*a, **b): - print a, b - def callb(*a, **b): - global finished - finished += 1 - print a, b - - print "starting" - - conns = 4 - - chunks = [] - for a in range(conns): - fh = open("file.part%d" % a, "wb") - chunk = HTTPChunk(url, fh, bucket=bucket, range=(a*5*1024*1024, (a+1)*5*1024*1024)) - print "fireing chunk #%d" % a - d = chunk.download() - d.addCallback(callb) - d.addErrback(err) - chunks.append(chunk) - - try: + fh.close() + raise WrongFormat() + ci = ChunkInfo(name) + ci.loaded = True + ci.setSize(size) while True: - aspeed = 0 - for a, chunk in enumerate(chunks): - if not chunk.finished: - print "#%d" % a, chunk.getSpeed()/1024, "kb/s" - else: - print "#%d" % a, "finished" - aspeed += chunk.getSpeed() - print "sum", aspeed/1024 - if finished == conns: - print "- finished" + if not fh.readline(): #skip line break - sleep(1) - except KeyboardInterrupt: - for chunk in chunks: - chunk.abort = True - sys.exit() + name = fh.readline()[1:-1] + range = fh.readline()[1:-1] + if name.startswith("name:") and range.startswith("range:"): + name = name[5:] + range = range[6:].split("-") + else: + raise WrongFormat() + + ci.addChunk(name, (long(range[0]), long(range[1]))) + fh.close() + return ci + + def remove(self): + if exists("%s.chunks" % self.name): remove("%s.chunks" % self.name) + + def getCount(self): + return len(self.chunks) + + def getChunkName(self, index): + return self.chunks[index][0] + + def getChunkRange(self, index): + return self.chunks[index][1] + +class HTTPChunk(HTTPRequest): + def __init__(self, id, parent, range=None, resume=False): + self.id = id + self.p = parent # HTTPDownload instance + self.range = range # tuple (start, end) + self.resume = resume + + self.arrived = 0 + self.lastURL = self.p.referer + + self.c = pycurl.Curl() + + self.header = "" + self.headerParsed = False #indicates if the header has been processed + + self.fp = None #file handle + + self.initHandle() + self.setInterface(self.p.interface, self.p.proxies) + + @property + def cj(self): + return self.p.cj + + def getHandle(self): + """ returns a Curl handle ready to use for perform/multiperform """ + + self.setRequestContext(self.p.url, self.p.get, self.p.post, self.p.referer, self.p.cj) + self.c.setopt(pycurl.WRITEFUNCTION, self.writeBody) + self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader) + + if self.resume: + self.fp = open(self.p.info.getChunkName(self.id), "ab") + self.arrived = self.fp.tell() + + if self.range: + #print "Chunked resume with range %i-%i" % (self.arrived+self.range[0], self.range[1]) + self.c.setopt(pycurl.RANGE, "%i-%i" % (self.arrived+self.range[0], self.range[1])) + else: + #print "Resume File from %i" % self.arrived + self.c.setopt(pycurl.RESUME_FROM, self.arrived) + + else: + if self.range: + #print "Chunked with range %i-%i" % self.range + self.c.setopt(pycurl.RANGE, "%i-%i" % self.range) + + self.fp = open(self.p.info.getChunkName(self.id), "wb") + + return self.c + + def writeHeader(self, buf): + self.header += buf + #@TODO forward headers?, this is possibly unneeeded, when we just parse valid 200 headers + # as first chunk, we will parse the headers + if self.header.endswith("\r\n\r\n") and not self.range: + self.parseHeader() + + def writeBody(self, buf): + size = len(buf) + self.arrived += size + + self.fp.write(buf) + + if self.p.bucket: + sleep(self.p.bucket.consumed(size)) + + if self.range and self.arrived > (self.range[1]-self.range[0]): + return 0 #close if we have enough data + + + def parseHeader(self): + """parse data from recieved header""" + for line in self.header.splitlines(): + line = line.strip().lower() + if line.startswith("accept-ranges") and "bytes" in line: + self.p.chunkSupport = True + + if not self.resume and line.startswith("content-length"): + self.p.size = int(line.split(":")[1]) + + self.headerParsed = True + + def close(self): + """ closes everything, unusable after this """ + if self.fp: self.fp.close() + self.c.close() + if hasattr(self, "p"): del self.p
\ No newline at end of file |