path: root/pyload/network/
diff options
authorGravatar RaNaN <> 2013-06-09 18:10:22 +0200
committerGravatar RaNaN <> 2013-06-09 18:10:23 +0200
commit16af85004c84d0d6c626b4f8424ce9647669a0c1 (patch)
tree025d479862d376dbc17e934f4ed20031c8cd97d1 /pyload/network/
parentadapted to jshint config (diff)
moved everything from module to pyload
Diffstat (limited to 'pyload/network/')
1 files changed, 338 insertions, 0 deletions
diff --git a/pyload/network/ b/pyload/network/
new file mode 100644
index 000000000..04bf2363a
--- /dev/null
+++ b/pyload/network/
@@ -0,0 +1,338 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ See the GNU General Public License for more details.
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <>.
+ @author: RaNaN
+from os import remove
+from os.path import dirname
+from time import time
+from shutil import move
+from logging import getLogger
+import pycurl
+from HTTPChunk import ChunkInfo, HTTPChunk
+from HTTPRequest import BadHeader
+from pyload.plugins.Base import Abort
+from pyload.utils.fs import save_join, fs_encode
+# TODO: save content-disposition for resuming
+class HTTPDownload():
+ """ loads an url, http + ftp supported """
+ def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None,
+ options={}, disposition=False):
+ self.url = url
+ self.filename = filename #complete file destination, not only name
+ self.get = get
+ = post
+ self.referer = referer
+ self.cj = cj #cookiejar if cookies are needed
+ self.bucket = bucket
+ self.options = options
+ self.disposition = disposition
+ # all arguments
+ self.abort = False
+ self.size = 0
+ self._name = ""# will be parsed from content disposition
+ self.chunks = []
+ self.log = getLogger("log")
+ try:
+ = ChunkInfo.load(filename)
+ = True #resume is only possible with valid info file
+ self.size =
+ self.infoSaved = True
+ except IOError:
+ = ChunkInfo(filename)
+ self.chunkSupport = None
+ self.m = pycurl.CurlMulti()
+ #needed for speed calculation
+ self.lastArrived = []
+ self.speeds = []
+ self.lastSpeeds = [0, 0]
+ @property
+ def speed(self):
+ last = [sum(x) for x in self.lastSpeeds if x]
+ return (sum(self.speeds) + sum(last)) / (1 + len(last))
+ @property
+ def arrived(self):
+ return sum([c.arrived for c in self.chunks])
+ @property
+ def percent(self):
+ if not self.size: return 0
+ return (self.arrived * 100) / self.size
+ @property
+ def name(self):
+ return self._name if self.disposition else ""
+ def _copyChunks(self):
+ init = fs_encode( #initial chunk name
+ if > 1:
+ fo = open(init, "rb+") #first chunkfile
+ for i in range(1,
+ #input file
+ - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks
+ fname = fs_encode("%s.chunk%d" % (self.filename, i))
+ fi = open(fname, "rb")
+ buf = 32 * 1024
+ while True: #copy in chunks, consumes less memory
+ data =
+ if not data:
+ break
+ fo.write(data)
+ fi.close()
+ if fo.tell() <[1]:
+ fo.close()
+ remove(init)
+ #there are probably invalid chunks
+ raise Exception("Downloaded content was smaller than expected. Try to reduce download connections.")
+ remove(fname) #remove chunk
+ fo.close()
+ if
+ self.filename = save_join(dirname(self.filename),
+ move(init, fs_encode(self.filename))
+ #remove info file
+ def download(self, chunks=1, resume=False):
+ """ returns new filename or None """
+ chunks = max(1, chunks)
+ resume = and resume
+ try:
+ self._download(chunks, resume)
+ except pycurl.error, e:
+ #code 33 - no resume
+ code = e.args[0]
+ if code == 33:
+ # try again without resume
+ self.log.debug("Errno 33 -> Restart without resume")
+ #remove old handles
+ for chunk in self.chunks:
+ self.closeChunk(chunk)
+ return self._download(chunks, False)
+ else:
+ raise
+ finally:
+ self.close()
+ return
+ def _download(self, chunks, resume):
+ if not resume:
+"%s.chunk0" % self.filename, (0, 0)) #create an initial entry
+ self.chunks = []
+ init = HTTPChunk(0, self, None, resume) #initial chunk that will load complete file (if needed)
+ self.chunks.append(init)
+ self.m.add_handle(init.getHandle())
+ lastFinishCheck = 0
+ lastTimeCheck = 0
+ chunksDone = set() # list of curl handles that are finished
+ chunksCreated = False
+ done = False
+ if > 1: # This is a resume, if we were chunked originally assume still can
+ self.chunkSupport = True
+ while 1:
+ #need to create chunks
+ if not chunksCreated and self.chunkSupport and self.size: #will be set later by first chunk
+ if not resume:
+ chunks =
+ init.setRange(
+ for i in range(1, chunks):
+ c = HTTPChunk(i, self,, resume)
+ handle = c.getHandle()
+ if handle:
+ self.chunks.append(c)
+ self.m.add_handle(handle)
+ else:
+ #close immediately
+ self.log.debug("Invalid curl handle -> closed")
+ c.close()
+ chunksCreated = True
+ while 1:
+ ret, num_handles = self.m.perform()
+ if ret != pycurl.E_CALL_MULTI_PERFORM:
+ break
+ t = time()
+ # reduce these calls
+ # when num_q is 0, the loop is exited
+ while lastFinishCheck + 0.5 < t:
+ # list of failed curl handles
+ failed = []
+ ex = None # save only last exception, we can only raise one anyway
+ num_q, ok_list, err_list = self.m.info_read()
+ for c in ok_list:
+ chunk = self.findChunk(c)
+ try: # check if the header implies success, else add it to failed list
+ chunk.verifyHeader()
+ except BadHeader, e:
+ self.log.debug("Chunk %d failed: %s" % ( + 1, str(e)))
+ failed.append(chunk)
+ ex = e
+ else:
+ chunksDone.add(c)
+ for c in err_list:
+ curl, errno, msg = c
+ chunk = self.findChunk(curl)
+ #test if chunk was finished
+ if errno != 23 or "0 !=" not in msg:
+ failed.append(chunk)
+ ex = pycurl.error(errno, msg)
+ self.log.debug("Chunk %d failed: %s" % ( + 1, str(ex)))
+ continue
+ try: # check if the header implies success, else add it to failed list
+ chunk.verifyHeader()
+ except BadHeader, e:
+ self.log.debug("Chunk %d failed: %s" % ( + 1, str(e)))
+ failed.append(chunk)
+ ex = e
+ else:
+ chunksDone.add(curl)
+ if not num_q: # no more info to get
+ # check if init is not finished so we reset download connections
+ # note that other chunks are closed and everything downloaded with initial connection
+ if failed and init not in failed and init.c not in chunksDone:
+ self.log.error(_("Download chunks failed, fallback to single connection | %s" % (str(ex))))
+ #list of chunks to clean and remove
+ to_clean = filter(lambda x: x is not init, self.chunks)
+ for chunk in to_clean:
+ self.closeChunk(chunk)
+ self.chunks.remove(chunk)
+ remove(fs_encode(
+ #let first chunk load the rest and update the info file
+ init.resetRange()
+"%s.chunk0" % self.filename, (0, self.size))
+ elif failed:
+ raise ex
+ lastFinishCheck = t
+ if len(chunksDone) >= len(self.chunks):
+ if len(chunksDone) > len(self.chunks):
+ self.log.warning("Finished download chunks size incorrect, please report bug.")
+ done = True #all chunks loaded
+ break
+ if done:
+ break #all chunks loaded
+ # calc speed once per second, averaging over 3 seconds
+ if lastTimeCheck + 1 < t:
+ diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in
+ enumerate(self.chunks)]
+ self.lastSpeeds[1] = self.lastSpeeds[0]
+ self.lastSpeeds[0] = self.speeds
+ self.speeds = [float(a) / (t - lastTimeCheck) for a in diff]
+ self.lastArrived = [c.arrived for c in self.chunks]
+ lastTimeCheck = t
+ if self.abort:
+ raise Abort()
+ for chunk in self.chunks:
+ chunk.flushFile() #make sure downloads are written to disk
+ self._copyChunks()
+ def findChunk(self, handle):
+ """ linear search to find a chunk (should be ok since chunk size is usually low) """
+ for chunk in self.chunks:
+ if chunk.c == handle: return chunk
+ def closeChunk(self, chunk):
+ try:
+ self.m.remove_handle(chunk.c)
+ except pycurl.error, e:
+ self.log.debug("Error removing chunk: %s" % str(e))
+ finally:
+ chunk.close()
+ def close(self):
+ """ cleanup """
+ for chunk in self.chunks:
+ self.closeChunk(chunk)
+ self.chunks = []
+ if hasattr(self, "m"):
+ self.m.close()
+ del self.m
+ if hasattr(self, "cj"):
+ del self.cj
+ if hasattr(self, "info"):
+ del
+if __name__ == "__main__":
+ url = ""
+ from Bucket import Bucket
+ bucket = Bucket()
+ bucket.setRate(200 * 1024)
+ bucket = None
+ print "starting"
+ dwnld = HTTPDownload(url, "test_100mb.bin", bucket=bucket)
+, resume=True)