summaryrefslogtreecommitdiffstats
path: root/module/network/HTTPDownload.py
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-12-27 21:18:29 +0100
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-12-27 21:18:29 +0100
commit9509a6444bbb538e136ed899d94aab32be629383 (patch)
treeac8532b20912a3e5be6ff73443520a7f31f5806a /module/network/HTTPDownload.py
parentencoding fix (diff)
downloadpyload-9509a6444bbb538e136ed899d94aab32be629383.tar.xz
new curl download backend - support for chunked dl, resume
Diffstat (limited to 'module/network/HTTPDownload.py')
-rw-r--r--module/network/HTTPDownload.py475
1 files changed, 173 insertions, 302 deletions
diff --git a/module/network/HTTPDownload.py b/module/network/HTTPDownload.py
index bce698e1e..e3ac09e84 100644
--- a/module/network/HTTPDownload.py
+++ b/module/network/HTTPDownload.py
@@ -14,337 +14,208 @@
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
- @author: mkaay
+ @author: RaNaN
"""
-from HTTPChunk import HTTPChunk
-from helper import *
-from os.path import exists, getsize
from os import remove
-#from shutil import move, copyfileobj
-from zlib import decompressobj, MAX_WBITS
+from time import sleep, time
+from shutil import move
-from cookielib import CookieJar
+import pycurl
-class WrongFormat(Exception):
- pass
+from HTTPRequest import HTTPRequest
+from HTTPChunk import ChunkInfo, HTTPChunk
-class ChunkInfo():
- def __init__(self, name):
- self.name = name
- self.size = None
- self.loaded = False
- self.chunks = []
-
- def setSize(self, size):
- self.size = int(size)
-
- def addChunk(self, name, range, encoding):
- self.chunks.append((name, range, encoding))
-
- def clear(self):
- self.chunks = []
- self.loaded = False
-
- def save(self):
- fh = open("%s.chunks" % self.name, "w")
- fh.write("name:%s\n" % self.name)
- fh.write("size:%s\n" % self.size)
- for i, c in enumerate(self.chunks):
- fh.write("#%d:\n" % i)
- fh.write("\tname:%s\n" % c[0])
- fh.write("\tencoding:%s\n" % c[2])
- fh.write("\trange:%i-%i\n" % c[1])
-
- @staticmethod
- def load(name):
- if not exists("%s.chunks" % name):
- raise IOError()
- fh = open("%s.chunks" % name, "r")
- name = fh.readline()[:-1]
- size = fh.readline()[:-1]
- if name.startswith("name:") and size.startswith("size:"):
- name = name[5:]
- size = size[5:]
- else:
- raise WrongFormat()
- ci = ChunkInfo(name)
- ci.loaded = True
- ci.setSize(size)
- while True:
- if not fh.readline(): #skip line
- break
- name = fh.readline()[1:-1]
- encoding = fh.readline()[1:-1]
- range = fh.readline()[1:-1]
- if name.startswith("name:") and encoding.startswith("encoding:") and range.startswith("range:"):
- name = name[5:]
- encoding = encoding[9:]
- range = range[6:].split("-")
- else:
- raise WrongFormat()
-
- ci.addChunk(name, (long(range[0]), long(range[1])), encoding)
- return ci
-
- def removeInfo(self):
- remove("%s.chunks" % self.name)
-
- def getCount(self):
- return len(self.chunks)
-
- def getChunkName(self, index):
- return self.chunks[index][0]
-
- def getChunkRange(self, index):
- return self.chunks[index][1]
-
- def getChunkEncoding(self, index):
- return self.chunks[index][2]
-
-class WrappedHTTPDeferred(WrappedDeferred):
- pass
-
-class HTTPDownload():
- def __init__(self, url, filename, get={}, post={}, referer=None, cookies=True, customHeaders={}, bucket=None, interface=None, proxies={}):
+from module.plugins.Plugin import Abort
+
+class HTTPDownload(HTTPRequest):
+ def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None,
+ interface=None, proxies={}):
self.url = url
- self.filename = filename
- self.interface = interface
- self.proxies = proxies
-
+ self.filename = filename #complete file destination, not only name
self.get = get
self.post = post
-
self.referer = referer
- self.cookies = cookies
-
- self.customHeaders = customHeaders
-
+ self.cj = cj #cookiejar if cookies are needed
self.bucket = bucket
-
- self.deferred = Deferred()
-
- self.finished = False
- self._abort = False
- self.size = None
-
- self.cookieJar = CookieJar()
-
+ self.interface = interface
+ self.proxies = proxies
+ # all arguments
+
+ self.abort = False
+ self.size = 0
+
self.chunks = []
self.chunksDone = 0
+
try:
self.info = ChunkInfo.load(filename)
+ self.info.resume = True #resume is only possible with valid info file
+ self.size = self.info.size
except IOError:
self.info = ChunkInfo(filename)
- self.noChunkSupport = False
-
+
+ self.chunkSupport = None
+ self.m = pycurl.CurlMulti()
+
+ #needed for speed calculation
+ self.lastChecked = 0
+ self.lastArrived = []
+ self.speeds = []
+
+
+ @property
+ def speed(self):
+ return sum(self.speeds)
+
@property
def arrived(self):
- arrived = 0
- try:
- for i in range(self.info.getCount()):
- arrived += getsize(self.info.getChunkName(i)) #ugly, but difficult to calc otherwise due chunk resume
- except OSError:
- arrived = self.size
- return arrived
-
- def setAbort(self, val):
- self._abort = val
- for chunk in self.chunks:
- chunk.abort = val
-
- def getAbort(self):
- return self._abort
-
- abort = property(getAbort, setAbort)
-
- def getSpeed(self):
- speed = 0
- for chunk in self.chunks:
- speed += chunk.getSpeed()
- return speed
-
+ return sum([c.arrived for c in self.chunks])
+
@property
- def speed(self):
- return self.getSpeed()
-
- def calcProgress(self, p):
- self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100))
-
- def _chunkDone(self):
- self.chunksDone += 1
- #print self.chunksDone, "/", len(self.chunks)
- if self.chunksDone == len(self.chunks):
- self._copyChunks()
-
+ def percent(self):
+ if not self.size: return 0
+ return (self.arrived * 100) / self.size
+
def _copyChunks(self):
- fo = open(self.filename, "wb") #out file
- for i in range(self.info.getCount()):
- encoding = self.info.getChunkEncoding(i)
-
- #decompress method, if any
- decompress = lambda data: data
- if encoding == "gzip":
- gz = decompressobj(16+MAX_WBITS)
- decompress = lambda data: gz.decompress(data)
- if encoding == "deflate":
- df = decompressobj(-MAX_WBITS)
- decompress = lambda data: df.decompress(data)
-
- #input file
- fname = "%s.chunk%d" % (self.filename, i)
- fi = open(fname, "rb")
- while True: #copy in chunks, consumes less memory
- data = fi.read(512*1024)
- if not data:
- break
- fo.write(decompress(data)) #decompressing
- fi.close()
- remove(fname) #remove
- fo.close()
- self.info.removeInfo() #remove info file
- self.deferred.callback() #done, emit callbacks
-
- def _createChunk(self, fh, range=None):
- chunk = HTTPChunk(self.url, fh, get=self.get, post=self.post,
- referer=self.referer, cookies=self.cookies,
- customHeaders=self.customHeaders,
- bucket=self.bucket, range=range,
- interface=self.interface, proxies=self.proxies)
- chunk.cookieJar = self.cookieJar
- return chunk
-
- def _addChunk(self, chunk, d):
- self.chunks.append(chunk)
- d.addProgress("percent", self.calcProgress)
- d.addCallback(self._chunkDone)
- d.addErrback(lambda *args, **kwargs: self.setAbort(True))
- d.addErrback(self.deferred.error)
-
+ init = self.info.getChunkName(0) #initial chunk name
+
+ if len(self.chunks) > 1:
+ fo = open(init, "rb+") #first chunkfile
+ for i in range(1, self.info.getCount()):
+ #input file
+ fo.seek(self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks
+ fname = "%s.chunk%d" % (self.filename, i)
+ fi = open(fname, "rb")
+ buf = 512 * 1024
+ while True: #copy in chunks, consumes less memory
+ data = fi.read(buf)
+ if not data:
+ break
+ fo.write(data)
+ fi.close()
+ remove(fname) #remove chunk
+ fo.close()
+
+ move(init, self.filename)
+ self.info.remove() #remove info file
+
def download(self, chunks=1, resume=False):
- self.chunksDone = 0
- if chunks > 0:
- #diffentent chunk count in info, resetting
- if self.info.loaded and not self.info.getCount() == chunks:
- self.info.clear()
-
- #if resuming, calculate range with offset
- crange = None
- if resume:
- if self.info.getCount() == chunks and exists("%s.chunk0" % (self.filename, )):
- crange = self.info.getChunkRange(0)
- crange = (crange[0]+getsize("%s.chunk0" % (self.filename, )), crange[1]-1)
-
- #if firstpart not done
- if crange is None or crange[1]-crange[0] > 0:
- fh = open("%s.chunk0" % (self.filename, ), "ab" if crange else "wb")
-
- chunk = self._createChunk(fh, range=crange)
-
- d = chunk.download() #start downloading
- self._addChunk(chunk, d)
-
- #no info file, need to calculate ranges
- if not self.info.loaded:
- size = chunk.size #overall size
- chunksize = size/chunks #chunk size
-
- chunk.range = (0, chunksize) #setting range for first part
- chunk.noRangeHeader = True
- chunk.size = chunksize #setting size for first chunk
-
- self.size = size #setting overall size
- self.info.setSize(self.size) #saving overall size
- self.info.addChunk("%s.chunk0" % (self.filename, ), chunk.range, chunk.getEncoding()) #add chunk to infofile
+ chunks = max(1, chunks)
+ resume = self.info.resume and resume
+ self.chunks = []
+
+ try:
+ self._download(chunks, resume)
+ finally:
+ self.clean()
+
+ def _download(self, chunks, resume):
+ if not resume:
+ self.info.addChunk("%s.chunk0" % self.filename, (0, 0))
+
+ init = HTTPChunk(0, self, None, resume) #initial chunk that will load complete file (if needed)
+
+ self.chunks.append(init)
+ self.m.add_handle(init.getHandle())
+
+ while 1:
+ #need to create chunks
+ if len(self.chunks) < chunks and self.chunkSupport and self.size: #will be set later by first chunk
+
+ if not resume:
+ self.info.setSize(self.size)
+ self.info.createChunks(chunks)
+ self.info.save()
+
+ chunks = self.info.getCount()
+
+ init.range = self.info.getChunkRange(0)
+
+ for i in range(1, chunks):
+ c = HTTPChunk(i, self, self.info.getChunkRange(i), resume)
+ self.chunks.append(c)
+ self.m.add_handle(c.getHandle())
+
+ while 1:
+ ret, num_handles = self.m.perform()
+
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+
+ while 1:
+ num_q, ok_list, err_list = self.m.info_read()
+ for c in ok_list:
+ self.chunksDone += 1
+ for c in err_list:
+ curl, errno, msg = c
+ #test if chunk was finished, otherwise raise the exception
+ if errno != 23 or "0 !=" not in msg:
+ raise pycurl.error(errno, msg)
+
+ #@TODO KeyBoardInterrupts are seen as finished chunks,
+ #but normally not handled to this process, only in the testcase
- lastchunk = size - chunksize*(chunks-1) #calculating size for last chunk
- self.firstchunk = chunk #remeber first chunk
-
- if self.info.loaded and not self.size:
- self.size = self.info.size #setting overall size
-
- for i in range(1, chunks): #other chunks
- cont = False
- if not self.info.loaded: #first time load
- if i+1 == chunks: #last chunk?
- rng = (i*chunksize, i*chunksize+lastchunk-1)
- else:
- rng = (i*chunksize, (i+1)*chunksize-1) #adjusting range
- else: #info available
- rng = self.info.getChunkRange(i) #loading previous range
- if resume and exists("%s.chunk%d" % (self.filename, i)): #continue chunk
- rng = (rng[0]+getsize("%s.chunk%d" % (self.filename, i)), rng[1]) #adjusting offset
- cont = True #set append mode
-
- if rng[1]-rng[0] <= 0: #chunk done
- continue
-
- fh = open("%s.chunk%d" % (self.filename, i), "ab" if cont else "wb")
- chunk = self._createChunk(fh, range=rng)
- d = chunk.download() #try
-
- if not chunk.resp.getcode() == 206 and i == 1: #no range supported, tell first chunk to download everything
- chunk.abort = True
- self.noChunkSupport = True
- self.firstchunk.size = self.size
- self.firstchunk.range = None
- self.info.clear() #clear info
- self.info.addChunk("%s.chunk0" % (self.filename, ), (0, self.firstchunk.size), chunk.getEncoding()) #re-adding info with correct ranges
+ self.chunksDone += 1
+ if not num_q:
break
-
- self._addChunk(chunk, d)
-
- if not self.info.loaded: #adding info
- self.info.addChunk("%s.chunk%d" % (self.filename, i), chunk.range, chunk.getEncoding())
-
- self.info.save() #saving info
- if not len(self.chunks):
- self._copyChunks()
- return WrappedHTTPDeferred(self, self.deferred)
- else:
- raise Exception("no chunks")
+
+ if self.chunksDone == len(self.chunks):
+ break #all chunks loaded
+
+ # calc speed once per second
+ t = time()
+ if self.lastChecked + 1 < t:
+ diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in
+ enumerate(self.chunks)]
+
+ #for i, c in enumerate(self.chunks):
+ # diff[i] = c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0)
+
+ self.speeds = [float(a) / (t - self.lastChecked) for a in diff]
+ self.lastArrived = [c.arrived for c in self.chunks]
+ self.lastChecked = t
+ #print "------------------------"
+ #print self.speed / 1024, "kb/s"
+ #print "Arrived:", self.arrived
+ #print "Size:", self.size
+ #print self.percent, "%"
+
+ if self.abort:
+ raise Abort()
+
+ sleep(0.001) #supress busy waiting - limits dl speed to (1 / x) * buffersize
+ self.m.select(1)
+
+ for chunk in self.chunks:
+ chunk.fp.close()
+ self.m.remove_handle(chunk.c)
+
+ self._copyChunks()
def clean(self):
""" cleanup """
- for c in self.chunks:
- c.clean()
+ for chunk in self.chunks:
+ chunk.close()
+ self.m.remove_handle(chunk.c)
+
+ self.m.close()
+ self.chunks = []
+ if hasattr(self, "cj"):
+ del self.cj
+ if hasattr(self, "info"):
+ del self.info
if __name__ == "__main__":
- import sys
+ url = "http://speedtest.netcologne.de/test_10mb.bin"
+
from Bucket import Bucket
+
bucket = Bucket()
- bucket.setRate(200*1024)
- #bucket = None
-
- url = "http://speedtest.netcologne.de/test_100mb.bin"
-
- finished = False
- def err(*a, **b):
- print a, b
- def callb(*a, **b):
- global finished
- finished = True
- print a, b
-
+ bucket.setRate(200 * 1024)
+ bucket = None
+
print "starting"
-
- dwnld = HTTPDownload(url, "test_100mb.bin", bucket=bucket)
- d = dwnld.download(chunks=5, resume=True)
- d.addCallback(callb)
- d.addErrback(err)
-
- try:
- while True:
- for a, chunk in enumerate(dwnld.chunks):
- if not chunk.finished:
- print "#%d" % a, chunk.getSpeed()/1024, "kb/s", "size", int(float(chunk.arrived)/chunk.size*100), "%"
- else:
- print "#%d" % a, "finished"
- print "sum", dwnld.speed/1024, dwnld.arrived, "/", dwnld.size, int(float(dwnld.arrived)/dwnld.size*100), "%"
- if finished:
- print "- finished"
- break
- sleep(1)
- except KeyboardInterrupt:
- dwnld.abort = True
- sys.exit()
+
+ dwnld = HTTPDownload(url, "test_10mb.bin", bucket=bucket)
+ dwnld.download(chunks=3, resume=True) \ No newline at end of file