diff options
Diffstat (limited to 'module/network')
-rw-r--r-- | module/network/FTPBase.py | 208 | ||||
-rw-r--r-- | module/network/HTTPChunk.py | 31 | ||||
-rw-r--r-- | module/network/HTTPDownload.py | 22 |
3 files changed, 36 insertions, 225 deletions
diff --git a/module/network/FTPBase.py b/module/network/FTPBase.py deleted file mode 100644 index d8fc5a20d..000000000 --- a/module/network/FTPBase.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, see <http://www.gnu.org/licenses/>. - - @author: mkaay -""" - -from ftplib import FTP - -import socket -import socks - -from os.path import getsize -from urlparse import urlparse -from urllib2 import _parse_proxy - -from helper import * - -class FTPBase(FTP): - sourceAddress = ('', 0) - - def setSourceAddress(self, host): - self.sourceAddress = (host, 0) - - def connect(self, host='', port=0, timeout=30, proxies={}): - if host != '': - self.host = host - if port > 0: - self.port = port - self.timeout = timeout - - proxytype = None - proxy = None - if "socks5" in proxies: - proxytype = socks.PROXY_TYPE_SOCKS5 - proxy = proxies["socks5"] - elif "socks4" in proxies: - proxytype = socks.PROXY_TYPE_SOCKS4 - proxy = proxies["socks4"] - if proxytype: - self.sock = socks.socksocket() - t = _parse_proxy(proxy) - self.sock.setproxy(proxytype, addr=t[3].split(":")[0], port=int(t[3].split(":")[1]), username=t[1], password=t[2]) - else: - self.sock = socket.socket() - self.sock.settimeout(self.timeout) - self.sock.bind(self.sourceAddress) - self.sock.connect((self.host, self.port)) - self.af = self.sock.family - self.file = self.sock.makefile('rb') - self.welcome = self.getresp() - return self.welcome - -class WrappedFTPDeferred(WrappedDeferred): - pass - -class FTPDownload(): - def __init__(self, url, filename, interface=None, bucket=None, proxies={}): - self.url = url - self.filename = filename - - self.bucket = bucket - self.interface = interface - self.proxies = proxies - - self.deferred = Deferred() - - self.finished = False - self.size = None - - self.speed = 0 - - self.abort = False - - self.arrived = 0 - - self.startTime = None - self.endTime = None - - self.speed = 0 #byte/sec - self.speedCalcTime = None - self.speedCalcLen = 0 - - self.bufferSize = 16*1024 #tune if performance is poor - - self.ftp = FTPBase() - self.fh = None - - @threaded - def _download(self, offset): - remotename = self.url.split("/")[-1] - cmd = "RETR %s" % remotename - - self.startTime = inttime() - self.arrived = offset - conn, size = self.ftp.ntransfercmd(cmd, None if offset == 0 else offset) #explicit None - if size: - self.size = size + offset - while True: - if self.abort: - self.ftp.abort() - break - count = self.bufferSize - if self.bucket: - count = self.bucket.add(count) - if count == 0: - sleep(0.01) - continue - - try: - data = conn.recv(count) - except: - self.deferred.error("timeout") - - if self.speedCalcTime < inttime(): - self.speed = self.speedCalcLen - self.speedCalcTime = inttime() - self.speedCalcLen = 0 - try: - self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100)) - except: - pass - size = len(data) - self.speedCalcLen += size - self.arrived += size - - if not data: - break - - self.fh.write(data) - self.fh.close() - conn.close() - self.endTime = inttime() - if not self.abort: - print self.ftp.voidresp() #debug - - self.ftp.quit() - if self.abort: - self.deferred.error("abort") - elif self.size is None or self.size == self.arrived: - self.deferred.callback() - else: - self.deferred.error("wrong content lenght") - - def download(self, resume=False): - self.fh = open("%s.part" % self.filename, "ab" if resume else "wb") - offset = 0 - if resume: - offset = getsize("%s.part" % self.filename) - - up = urlparse(self.url) - - self.ftp.connect(up.hostname, up.port if up.port else 21, proxies=self.proxies) - self.ftp.login(up.username, up.password) - self.ftp.cwd("/".join(up.path.split("/")[:-1])) - self.ftp.voidcmd('TYPE I') - self.size = self.ftp.size(self.url.split("/")[-1]) - - self._download(offset) - return WrappedFTPDeferred(self, self.deferred) - -if __name__ == "__main__": - import sys - from Bucket import Bucket - bucket = Bucket() - bucket.setRate(200*1000) - #bucket = None - - url = "ftp://mirror.sov.uk.goscomb.net/ubuntu-releases/maverick/ubuntu-10.10-desktop-i386.iso" - - finished = False - def err(*a, **b): - print a, b - def callb(*a, **b): - global finished - finished = True - print a, b - - print "starting" - - dwnld = FTPDownload(url, "ubuntu_ftp.iso") - d = dwnld.download(resume=True) - d.addCallback(callb) - d.addErrback(err) - - try: - while True: - if not dwnld.finished: - print dwnld.speed/1024, "kb/s", "size", dwnld.arrived, "/", dwnld.size#, int(float(dwnld.arrived)/dwnld.size*100), "%" - if finished: - print "- finished" - break - sleep(1) - except KeyboardInterrupt: - dwnld.abort = True - sys.exit() diff --git a/module/network/HTTPChunk.py b/module/network/HTTPChunk.py index 440115c77..680b982d3 100644 --- a/module/network/HTTPChunk.py +++ b/module/network/HTTPChunk.py @@ -28,6 +28,7 @@ from HTTPRequest import HTTPRequest class WrongFormat(Exception): pass + class ChunkInfo(): def __init__(self, name): self.name = name @@ -115,6 +116,7 @@ class ChunkInfo(): def getChunkRange(self, index): return self.chunks[index][1] + class HTTPChunk(HTTPRequest): def __init__(self, id, parent, range=None, resume=False): self.id = id @@ -123,6 +125,7 @@ class HTTPChunk(HTTPRequest): self.resume = resume self.log = parent.log + self.size = range[1] - range[0] if range else -1 self.arrived = 0 self.lastURL = self.p.referer @@ -136,11 +139,13 @@ class HTTPChunk(HTTPRequest): self.initHandle() self.setInterface(self.p.options["interface"], self.p.options["proxies"], self.p.options["ipv6"]) - self.BOMChecked = False - # check and remove byte order mark + self.BOMChecked = False # check and remove byte order mark self.rep = None + self.sleep = 0.000 + self.lastSize = 0 + @property def cj(self): return self.p.cj @@ -218,11 +223,21 @@ class HTTPChunk(HTTPRequest): if self.p.bucket: sleep(self.p.bucket.consumed(size)) - elif size < 5000: #@TODO nice to have: traffic sharping algr. which calculates sleep time to reduce cpu load - #sleep if chunk size gets low, to avoid many function calls and hope chunksize gets bigger - sleep(0.007) + else: + # Avoid small buffers, increasing sleep time slowly if buffer size gets smaller + # otherwise reduce sleep time percentual (values are based on tests) + # So in general cpu time is saved without reducing bandwith too much - if self.range and self.arrived > (self.range[1] - self.range[0]): + if size < self.lastSize: + self.sleep += 0.002 + else: + self.sleep *= 0.7 + + self.lastSize = size + + sleep(self.sleep) + + if self.range and self.arrived > self.size: return 0 #close if we have enough data @@ -244,6 +259,10 @@ class HTTPChunk(HTTPRequest): self.headerParsed = True + def setRange(self, range): + self.range = range + self.size = range[1] - range[0] + def close(self): """ closes everything, unusable after this """ if self.fp: self.fp.close() diff --git a/module/network/HTTPDownload.py b/module/network/HTTPDownload.py index ac3252f68..f616b16b5 100644 --- a/module/network/HTTPDownload.py +++ b/module/network/HTTPDownload.py @@ -33,6 +33,7 @@ from module.utils import save_join class HTTPDownload(): """ loads a url http + ftp """ + def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None, options={}, progressNotify=None, disposition=False): self.url = url @@ -69,7 +70,7 @@ class HTTPDownload(): self.lastArrived = [] self.speeds = [] self.lastSpeeds = [0, 0] - + self.progressNotify = progressNotify @property @@ -93,7 +94,8 @@ class HTTPDownload(): fo = open(init, "rb+") #first chunkfile for i in range(1, self.info.getCount()): #input file - fo.seek(self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks + fo.seek( + self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks fname = "%s.chunk%d" % (self.filename, i) fi = open(fname, "rb") buf = 32 * 1024 @@ -113,7 +115,7 @@ class HTTPDownload(): if self.nameDisposition and self.disposition: self.filename = save_join(dirname(self.filename), self.nameDisposition) - + move(init, self.filename) self.info.remove() #remove info file @@ -164,7 +166,7 @@ class HTTPDownload(): chunksCreated = False done = False if self.info.getCount() > 1: # This is a resume, if we were chunked originally assume still can - self.chunkSupport=True + self.chunkSupport = True while 1: #need to create chunks @@ -177,7 +179,7 @@ class HTTPDownload(): chunks = self.info.getCount() - init.range = self.info.getChunkRange(0) + init.setRange(self.info.getChunkRange(0)) for i in range(1, chunks): c = HTTPChunk(i, self, self.info.getChunkRange(i), resume) @@ -191,10 +193,8 @@ class HTTPDownload(): self.log.debug("Invalid curl handle -> closed") c.close() - chunksCreated = True - while 1: ret, num_handles = self.m.perform() if ret != pycurl.E_CALL_MULTI_PERFORM: @@ -215,7 +215,7 @@ class HTTPDownload(): #@TODO KeyBoardInterrupts are seen as finished chunks, #but normally not handled to this process, only in the testcase - + chunksDone.add(curl) if not num_q: lastFinishCheck = t @@ -243,7 +243,7 @@ class HTTPDownload(): if self.abort: raise Abort() - sleep(0.003) #supress busy waiting - limits dl speed to (1 / x) * buffersize + #sleep(0.003) #supress busy waiting - limits dl speed to (1 / x) * buffersize self.m.select(1) failed = False @@ -261,11 +261,11 @@ class HTTPDownload(): if failed: raise BadHeader(failed) self._copyChunks() - + def updateProgress(self): if self.progressNotify: self.progressNotify(self.percent) - + def close(self): """ cleanup """ for chunk in self.chunks: |