summaryrefslogtreecommitdiffstats
path: root/module/network/HTTPChunk.py
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-12-27 21:18:29 +0100
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2010-12-27 21:18:29 +0100
commit9509a6444bbb538e136ed899d94aab32be629383 (patch)
treeac8532b20912a3e5be6ff73443520a7f31f5806a /module/network/HTTPChunk.py
parentencoding fix (diff)
downloadpyload-9509a6444bbb538e136ed899d94aab32be629383.tar.xz
new curl download backend - support for chunked dl, resume
Diffstat (limited to 'module/network/HTTPChunk.py')
-rw-r--r--module/network/HTTPChunk.py353
1 files changed, 176 insertions, 177 deletions
diff --git a/module/network/HTTPChunk.py b/module/network/HTTPChunk.py
index 02134ca63..0c184db94 100644
--- a/module/network/HTTPChunk.py
+++ b/module/network/HTTPChunk.py
@@ -14,185 +14,184 @@
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
- @author: mkaay
+ @author: RaNaN
"""
-
-from HTTPBase import HTTPBase
-from urllib2 import HTTPError
-from helper import *
+from os import remove
+from os.path import exists
from time import sleep
-from traceback import print_exc
-from module.plugins.Plugin import Abort
-from module.plugins.Plugin import Fail
-
-class HTTPChunk(HTTPBase):
- def __init__(self, url, fh, get={}, post={}, referer=None, cookies=True, customHeaders={}, range=None, bucket=None, interface=None, proxies={}):
- HTTPBase.__init__(self, interface=interface, proxies=proxies)
-
- self.url = url
- self.bucket = bucket
- self.range = range
- self.noRangeHeader = False
- self.fh = fh
-
- self.get = get
- self.post = post
- self.referer = referer
- self.cookies = cookies
- self.customHeaders = customHeaders
-
- self.deferred = Deferred()
-
- self.abort = False
- self.finished = False
-
- self.arrived = 0
-
- self.startTime = None
- self.endTime = None
-
- self.speed = 0 #byte/sec
- self.speedCalcTime = None
- self.speedCalcLen = 0
-
- self.bufferSize = 18*1024 #tune if performance is poor
- self.resp = None
-
- def getSpeed(self):
- return self.speed
-
- @threaded
- def _download(self, resp):
- self.arrived = 0
- self.lastSpeed = self.startTime = inttime()
-
- if self.noRangeHeader and not self.range[0] == 0:
- self.deferred.error("range starts not at 0")
-
- running = True
- while running:
- if self.abort:
- break
- count = self.bufferSize
- if self.noRangeHeader:
- count = min(count, self.range[1] - self.arrived)
- if self.bucket:
- count = self.bucket.add(count)
- if not count:
- sleep(0.01)
- continue
-
- try:
- data = resp.read(count)
- except:
- self.deferred.error(Fail, "timeout")
- break
-
- if self.speedCalcTime < inttime():
- self.speed = self.speedCalcLen
- self.speedCalcTime = inttime()
- self.speedCalcLen = 0
- try:
- self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100))
- except:
- pass
- size = len(data)
-
- self.arrived += size
- self.speedCalcLen += size
-
- if self.noRangeHeader and self.arrived == self.range[1]:
- running = False
-
- if size:
- self.fh.write(data)
- else:
- break
-
- self.speed = 0
- self.endTime = inttime()
- self.finished = True
- self.fh.close()
-
- if self.abort:
- self.deferred.error(Abort)
- elif self.size == self.arrived:
- self.deferred.callback()
- else:
- print self.arrived, self.size
- self.deferred.error(Fail, "wrong content-length")
-
- def getEncoding(self):
- try:
- if self.resp.info()["Content-Encoding"] in ("gzip", "deflate"):
- return self.resp.info()["Content-Encoding"]
- except:
- pass
- return "plain"
-
- def download(self):
- if self.range:
- self.customHeaders["Range"] = "bytes=%i-%i" % self.range
- try:
- resp = self.getResponse(self.url, self.get, self.post, self.referer, self.cookies, self.customHeaders)
- self.resp = resp
- except HTTPError, e:
- print_exc()
- self.deferred.error(e)
- return self.deferred
-
- if resp.getcode() in (200, 206):
- self._download(resp)
+
+import pycurl
+
+from HTTPRequest import HTTPRequest
+
+class WrongFormat(Exception):
+ pass
+
+class ChunkInfo():
+ def __init__(self, name):
+ self.name = name
+ self.size = 0
+ self.resume = False
+ self.chunks = []
+
+ def setSize(self, size):
+ self.size = int(size)
+
+ def addChunk(self, name, range):
+ self.chunks.append((name, range))
+
+ def clear(self):
+ self.chunks = []
+
+ def createChunks(self, chunks):
+ self.clear()
+ chunk_size = self.size / chunks
+
+ current = 0
+ for i in range(chunks):
+ end = self.size-1 if (i == chunks-1) else current+chunk_size
+ self.addChunk("%s.chunk%s" % (self.name, i), (current, end))
+ current += chunk_size + 1
+
+
+ def save(self):
+ fh = open("%s.chunks" % self.name, "w")
+ fh.write("name:%s\n" % self.name)
+ fh.write("size:%s\n" % self.size)
+ for i, c in enumerate(self.chunks):
+ fh.write("#%d:\n" % i)
+ fh.write("\tname:%s\n" % c[0])
+ fh.write("\trange:%i-%i\n" % c[1])
+ fh.close()
+
+ @staticmethod
+ def load(name):
+ if not exists("%s.chunks" % name):
+ raise IOError()
+ fh = open("%s.chunks" % name, "r")
+ name = fh.readline()[:-1]
+ size = fh.readline()[:-1]
+ if name.startswith("name:") and size.startswith("size:"):
+ name = name[5:]
+ size = size[5:]
else:
- self.deferred.error(resp.getcode(), resp)
- return self.deferred
-
-if __name__ == "__main__":
- import sys
- from Bucket import Bucket
- bucket = Bucket()
- bucket.setRate(200*1000)
- #bucket = None
-
- url = "http://speedtest.netcologne.de/test_100mb.bin"
-
- finished = 0
- def err(*a, **b):
- print a, b
- def callb(*a, **b):
- global finished
- finished += 1
- print a, b
-
- print "starting"
-
- conns = 4
-
- chunks = []
- for a in range(conns):
- fh = open("file.part%d" % a, "wb")
- chunk = HTTPChunk(url, fh, bucket=bucket, range=(a*5*1024*1024, (a+1)*5*1024*1024))
- print "fireing chunk #%d" % a
- d = chunk.download()
- d.addCallback(callb)
- d.addErrback(err)
- chunks.append(chunk)
-
- try:
+ fh.close()
+ raise WrongFormat()
+ ci = ChunkInfo(name)
+ ci.loaded = True
+ ci.setSize(size)
while True:
- aspeed = 0
- for a, chunk in enumerate(chunks):
- if not chunk.finished:
- print "#%d" % a, chunk.getSpeed()/1024, "kb/s"
- else:
- print "#%d" % a, "finished"
- aspeed += chunk.getSpeed()
- print "sum", aspeed/1024
- if finished == conns:
- print "- finished"
+ if not fh.readline(): #skip line
break
- sleep(1)
- except KeyboardInterrupt:
- for chunk in chunks:
- chunk.abort = True
- sys.exit()
+ name = fh.readline()[1:-1]
+ range = fh.readline()[1:-1]
+ if name.startswith("name:") and range.startswith("range:"):
+ name = name[5:]
+ range = range[6:].split("-")
+ else:
+ raise WrongFormat()
+
+ ci.addChunk(name, (long(range[0]), long(range[1])))
+ fh.close()
+ return ci
+
+ def remove(self):
+ if exists("%s.chunks" % self.name): remove("%s.chunks" % self.name)
+
+ def getCount(self):
+ return len(self.chunks)
+
+ def getChunkName(self, index):
+ return self.chunks[index][0]
+
+ def getChunkRange(self, index):
+ return self.chunks[index][1]
+
+class HTTPChunk(HTTPRequest):
+ def __init__(self, id, parent, range=None, resume=False):
+ self.id = id
+ self.p = parent # HTTPDownload instance
+ self.range = range # tuple (start, end)
+ self.resume = resume
+
+ self.arrived = 0
+ self.lastURL = self.p.referer
+
+ self.c = pycurl.Curl()
+
+ self.header = ""
+ self.headerParsed = False #indicates if the header has been processed
+
+ self.fp = None #file handle
+
+ self.initHandle()
+ self.setInterface(self.p.interface, self.p.proxies)
+
+ @property
+ def cj(self):
+ return self.p.cj
+
+ def getHandle(self):
+ """ returns a Curl handle ready to use for perform/multiperform """
+
+ self.setRequestContext(self.p.url, self.p.get, self.p.post, self.p.referer, self.p.cj)
+ self.c.setopt(pycurl.WRITEFUNCTION, self.writeBody)
+ self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader)
+
+ if self.resume:
+ self.fp = open(self.p.info.getChunkName(self.id), "ab")
+ self.arrived = self.fp.tell()
+
+ if self.range:
+ #print "Chunked resume with range %i-%i" % (self.arrived+self.range[0], self.range[1])
+ self.c.setopt(pycurl.RANGE, "%i-%i" % (self.arrived+self.range[0], self.range[1]))
+ else:
+ #print "Resume File from %i" % self.arrived
+ self.c.setopt(pycurl.RESUME_FROM, self.arrived)
+
+ else:
+ if self.range:
+ #print "Chunked with range %i-%i" % self.range
+ self.c.setopt(pycurl.RANGE, "%i-%i" % self.range)
+
+ self.fp = open(self.p.info.getChunkName(self.id), "wb")
+
+ return self.c
+
+ def writeHeader(self, buf):
+ self.header += buf
+ #@TODO forward headers?, this is possibly unneeeded, when we just parse valid 200 headers
+ # as first chunk, we will parse the headers
+ if self.header.endswith("\r\n\r\n") and not self.range:
+ self.parseHeader()
+
+ def writeBody(self, buf):
+ size = len(buf)
+ self.arrived += size
+
+ self.fp.write(buf)
+
+ if self.p.bucket:
+ sleep(self.p.bucket.consumed(size))
+
+ if self.range and self.arrived > (self.range[1]-self.range[0]):
+ return 0 #close if we have enough data
+
+
+ def parseHeader(self):
+ """parse data from recieved header"""
+ for line in self.header.splitlines():
+ line = line.strip().lower()
+ if line.startswith("accept-ranges") and "bytes" in line:
+ self.p.chunkSupport = True
+
+ if not self.resume and line.startswith("content-length"):
+ self.p.size = int(line.split(":")[1])
+
+ self.headerParsed = True
+
+ def close(self):
+ """ closes everything, unusable after this """
+ if self.fp: self.fp.close()
+ self.c.close()
+ if hasattr(self, "p"): del self.p \ No newline at end of file