path: root/module/network/
diff options
Diffstat (limited to 'module/network/')
1 files changed, 173 insertions, 302 deletions
diff --git a/module/network/ b/module/network/
index bce698e1e..e3ac09e84 100644
--- a/module/network/
+++ b/module/network/
@@ -14,337 +14,208 @@
You should have received a copy of the GNU General Public License
along with this program; if not, see <>.
- @author: mkaay
+ @author: RaNaN
-from HTTPChunk import HTTPChunk
-from helper import *
-from os.path import exists, getsize
from os import remove
-#from shutil import move, copyfileobj
-from zlib import decompressobj, MAX_WBITS
+from time import sleep, time
+from shutil import move
-from cookielib import CookieJar
+import pycurl
-class WrongFormat(Exception):
- pass
+from HTTPRequest import HTTPRequest
+from HTTPChunk import ChunkInfo, HTTPChunk
-class ChunkInfo():
- def __init__(self, name):
- = name
- self.size = None
- self.loaded = False
- self.chunks = []
- def setSize(self, size):
- self.size = int(size)
- def addChunk(self, name, range, encoding):
- self.chunks.append((name, range, encoding))
- def clear(self):
- self.chunks = []
- self.loaded = False
- def save(self):
- fh = open("%s.chunks" %, "w")
- fh.write("name:%s\n" %
- fh.write("size:%s\n" % self.size)
- for i, c in enumerate(self.chunks):
- fh.write("#%d:\n" % i)
- fh.write("\tname:%s\n" % c[0])
- fh.write("\tencoding:%s\n" % c[2])
- fh.write("\trange:%i-%i\n" % c[1])
- @staticmethod
- def load(name):
- if not exists("%s.chunks" % name):
- raise IOError()
- fh = open("%s.chunks" % name, "r")
- name = fh.readline()[:-1]
- size = fh.readline()[:-1]
- if name.startswith("name:") and size.startswith("size:"):
- name = name[5:]
- size = size[5:]
- else:
- raise WrongFormat()
- ci = ChunkInfo(name)
- ci.loaded = True
- ci.setSize(size)
- while True:
- if not fh.readline(): #skip line
- break
- name = fh.readline()[1:-1]
- encoding = fh.readline()[1:-1]
- range = fh.readline()[1:-1]
- if name.startswith("name:") and encoding.startswith("encoding:") and range.startswith("range:"):
- name = name[5:]
- encoding = encoding[9:]
- range = range[6:].split("-")
- else:
- raise WrongFormat()
- ci.addChunk(name, (long(range[0]), long(range[1])), encoding)
- return ci
- def removeInfo(self):
- remove("%s.chunks" %
- def getCount(self):
- return len(self.chunks)
- def getChunkName(self, index):
- return self.chunks[index][0]
- def getChunkRange(self, index):
- return self.chunks[index][1]
- def getChunkEncoding(self, index):
- return self.chunks[index][2]
-class WrappedHTTPDeferred(WrappedDeferred):
- pass
-class HTTPDownload():
- def __init__(self, url, filename, get={}, post={}, referer=None, cookies=True, customHeaders={}, bucket=None, interface=None, proxies={}):
+from module.plugins.Plugin import Abort
+class HTTPDownload(HTTPRequest):
+ def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None,
+ interface=None, proxies={}):
self.url = url
- self.filename = filename
- self.interface = interface
- self.proxies = proxies
+ self.filename = filename #complete file destination, not only name
self.get = get = post
self.referer = referer
- self.cookies = cookies
- self.customHeaders = customHeaders
+ self.cj = cj #cookiejar if cookies are needed
self.bucket = bucket
- self.deferred = Deferred()
- self.finished = False
- self._abort = False
- self.size = None
- self.cookieJar = CookieJar()
+ self.interface = interface
+ self.proxies = proxies
+ # all arguments
+ self.abort = False
+ self.size = 0
self.chunks = []
self.chunksDone = 0
try: = ChunkInfo.load(filename)
+ = True #resume is only possible with valid info file
+ self.size =
except IOError: = ChunkInfo(filename)
- self.noChunkSupport = False
+ self.chunkSupport = None
+ self.m = pycurl.CurlMulti()
+ #needed for speed calculation
+ self.lastChecked = 0
+ self.lastArrived = []
+ self.speeds = []
+ @property
+ def speed(self):
+ return sum(self.speeds)
def arrived(self):
- arrived = 0
- try:
- for i in range(
- arrived += getsize( #ugly, but difficult to calc otherwise due chunk resume
- except OSError:
- arrived = self.size
- return arrived
- def setAbort(self, val):
- self._abort = val
- for chunk in self.chunks:
- chunk.abort = val
- def getAbort(self):
- return self._abort
- abort = property(getAbort, setAbort)
- def getSpeed(self):
- speed = 0
- for chunk in self.chunks:
- speed += chunk.getSpeed()
- return speed
+ return sum([c.arrived for c in self.chunks])
- def speed(self):
- return self.getSpeed()
- def calcProgress(self, p):
- self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100))
- def _chunkDone(self):
- self.chunksDone += 1
- #print self.chunksDone, "/", len(self.chunks)
- if self.chunksDone == len(self.chunks):
- self._copyChunks()
+ def percent(self):
+ if not self.size: return 0
+ return (self.arrived * 100) / self.size
def _copyChunks(self):
- fo = open(self.filename, "wb") #out file
- for i in range(
- encoding =
- #decompress method, if any
- decompress = lambda data: data
- if encoding == "gzip":
- gz = decompressobj(16+MAX_WBITS)
- decompress = lambda data: gz.decompress(data)
- if encoding == "deflate":
- df = decompressobj(-MAX_WBITS)
- decompress = lambda data: df.decompress(data)
- #input file
- fname = "%s.chunk%d" % (self.filename, i)
- fi = open(fname, "rb")
- while True: #copy in chunks, consumes less memory
- data =*1024)
- if not data:
- break
- fo.write(decompress(data)) #decompressing
- fi.close()
- remove(fname) #remove
- fo.close()
- #remove info file
- self.deferred.callback() #done, emit callbacks
- def _createChunk(self, fh, range=None):
- chunk = HTTPChunk(self.url, fh, get=self.get,,
- referer=self.referer, cookies=self.cookies,
- customHeaders=self.customHeaders,
- bucket=self.bucket, range=range,
- interface=self.interface, proxies=self.proxies)
- chunk.cookieJar = self.cookieJar
- return chunk
- def _addChunk(self, chunk, d):
- self.chunks.append(chunk)
- d.addProgress("percent", self.calcProgress)
- d.addCallback(self._chunkDone)
- d.addErrback(lambda *args, **kwargs: self.setAbort(True))
- d.addErrback(self.deferred.error)
+ init = #initial chunk name
+ if len(self.chunks) > 1:
+ fo = open(init, "rb+") #first chunkfile
+ for i in range(1,
+ #input file
+ - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks
+ fname = "%s.chunk%d" % (self.filename, i)
+ fi = open(fname, "rb")
+ buf = 512 * 1024
+ while True: #copy in chunks, consumes less memory
+ data =
+ if not data:
+ break
+ fo.write(data)
+ fi.close()
+ remove(fname) #remove chunk
+ fo.close()
+ move(init, self.filename)
+ #remove info file
def download(self, chunks=1, resume=False):
- self.chunksDone = 0
- if chunks > 0:
- #diffentent chunk count in info, resetting
- if and not == chunks:
- #if resuming, calculate range with offset
- crange = None
- if resume:
- if == chunks and exists("%s.chunk0" % (self.filename, )):
- crange =
- crange = (crange[0]+getsize("%s.chunk0" % (self.filename, )), crange[1]-1)
- #if firstpart not done
- if crange is None or crange[1]-crange[0] > 0:
- fh = open("%s.chunk0" % (self.filename, ), "ab" if crange else "wb")
- chunk = self._createChunk(fh, range=crange)
- d = #start downloading
- self._addChunk(chunk, d)
- #no info file, need to calculate ranges
- if not
- size = chunk.size #overall size
- chunksize = size/chunks #chunk size
- chunk.range = (0, chunksize) #setting range for first part
- chunk.noRangeHeader = True
- chunk.size = chunksize #setting size for first chunk
- self.size = size #setting overall size
- #saving overall size
-"%s.chunk0" % (self.filename, ), chunk.range, chunk.getEncoding()) #add chunk to infofile
+ chunks = max(1, chunks)
+ resume = and resume
+ self.chunks = []
+ try:
+ self._download(chunks, resume)
+ finally:
+ self.clean()
+ def _download(self, chunks, resume):
+ if not resume:
+"%s.chunk0" % self.filename, (0, 0))
+ init = HTTPChunk(0, self, None, resume) #initial chunk that will load complete file (if needed)
+ self.chunks.append(init)
+ self.m.add_handle(init.getHandle())
+ while 1:
+ #need to create chunks
+ if len(self.chunks) < chunks and self.chunkSupport and self.size: #will be set later by first chunk
+ if not resume:
+ chunks =
+ init.range =
+ for i in range(1, chunks):
+ c = HTTPChunk(i, self,, resume)
+ self.chunks.append(c)
+ self.m.add_handle(c.getHandle())
+ while 1:
+ ret, num_handles = self.m.perform()
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ while 1:
+ num_q, ok_list, err_list = self.m.info_read()
+ for c in ok_list:
+ self.chunksDone += 1
+ for c in err_list:
+ curl, errno, msg = c
+ #test if chunk was finished, otherwise raise the exception
+ if errno != 23 or "0 !=" not in msg:
+ raise pycurl.error(errno, msg)
+ #@TODO KeyBoardInterrupts are seen as finished chunks,
+ #but normally not handled to this process, only in the testcase
- lastchunk = size - chunksize*(chunks-1) #calculating size for last chunk
- self.firstchunk = chunk #remeber first chunk
- if and not self.size:
- self.size = #setting overall size
- for i in range(1, chunks): #other chunks
- cont = False
- if not #first time load
- if i+1 == chunks: #last chunk?
- rng = (i*chunksize, i*chunksize+lastchunk-1)
- else:
- rng = (i*chunksize, (i+1)*chunksize-1) #adjusting range
- else: #info available
- rng = #loading previous range
- if resume and exists("%s.chunk%d" % (self.filename, i)): #continue chunk
- rng = (rng[0]+getsize("%s.chunk%d" % (self.filename, i)), rng[1]) #adjusting offset
- cont = True #set append mode
- if rng[1]-rng[0] <= 0: #chunk done
- continue
- fh = open("%s.chunk%d" % (self.filename, i), "ab" if cont else "wb")
- chunk = self._createChunk(fh, range=rng)
- d = #try
- if not chunk.resp.getcode() == 206 and i == 1: #no range supported, tell first chunk to download everything
- chunk.abort = True
- self.noChunkSupport = True
- self.firstchunk.size = self.size
- self.firstchunk.range = None
- #clear info
-"%s.chunk0" % (self.filename, ), (0, self.firstchunk.size), chunk.getEncoding()) #re-adding info with correct ranges
+ self.chunksDone += 1
+ if not num_q:
- self._addChunk(chunk, d)
- if not #adding info
-"%s.chunk%d" % (self.filename, i), chunk.range, chunk.getEncoding())
- #saving info
- if not len(self.chunks):
- self._copyChunks()
- return WrappedHTTPDeferred(self, self.deferred)
- else:
- raise Exception("no chunks")
+ if self.chunksDone == len(self.chunks):
+ break #all chunks loaded
+ # calc speed once per second
+ t = time()
+ if self.lastChecked + 1 < t:
+ diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in
+ enumerate(self.chunks)]
+ #for i, c in enumerate(self.chunks):
+ # diff[i] = c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0)
+ self.speeds = [float(a) / (t - self.lastChecked) for a in diff]
+ self.lastArrived = [c.arrived for c in self.chunks]
+ self.lastChecked = t
+ #print "------------------------"
+ #print self.speed / 1024, "kb/s"
+ #print "Arrived:", self.arrived
+ #print "Size:", self.size
+ #print self.percent, "%"
+ if self.abort:
+ raise Abort()
+ sleep(0.001) #supress busy waiting - limits dl speed to (1 / x) * buffersize
+ for chunk in self.chunks:
+ chunk.fp.close()
+ self.m.remove_handle(chunk.c)
+ self._copyChunks()
def clean(self):
""" cleanup """
- for c in self.chunks:
- c.clean()
+ for chunk in self.chunks:
+ chunk.close()
+ self.m.remove_handle(chunk.c)
+ self.m.close()
+ self.chunks = []
+ if hasattr(self, "cj"):
+ del self.cj
+ if hasattr(self, "info"):
+ del
if __name__ == "__main__":
- import sys
+ url = ""
from Bucket import Bucket
bucket = Bucket()
- bucket.setRate(200*1024)
- #bucket = None
- url = ""
- finished = False
- def err(*a, **b):
- print a, b
- def callb(*a, **b):
- global finished
- finished = True
- print a, b
+ bucket.setRate(200 * 1024)
+ bucket = None
print "starting"
- dwnld = HTTPDownload(url, "test_100mb.bin", bucket=bucket)
- d =, resume=True)
- d.addCallback(callb)
- d.addErrback(err)
- try:
- while True:
- for a, chunk in enumerate(dwnld.chunks):
- if not chunk.finished:
- print "#%d" % a, chunk.getSpeed()/1024, "kb/s", "size", int(float(chunk.arrived)/chunk.size*100), "%"
- else:
- print "#%d" % a, "finished"
- print "sum", dwnld.speed/1024, dwnld.arrived, "/", dwnld.size, int(float(dwnld.arrived)/dwnld.size*100), "%"
- if finished:
- print "- finished"
- break
- sleep(1)
- except KeyboardInterrupt:
- dwnld.abort = True
- sys.exit()
+ dwnld = HTTPDownload(url, "test_10mb.bin", bucket=bucket)
+, resume=True) \ No newline at end of file