summaryrefslogtreecommitdiffstats
path: root/module/network
diff options
context:
space:
mode:
Diffstat (limited to 'module/network')
-rw-r--r--module/network/FTPBase.py208
-rw-r--r--module/network/HTTPChunk.py31
-rw-r--r--module/network/HTTPDownload.py22
3 files changed, 36 insertions, 225 deletions
diff --git a/module/network/FTPBase.py b/module/network/FTPBase.py
deleted file mode 100644
index d8fc5a20d..000000000
--- a/module/network/FTPBase.py
+++ /dev/null
@@ -1,208 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-"""
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, see <http://www.gnu.org/licenses/>.
-
- @author: mkaay
-"""
-
-from ftplib import FTP
-
-import socket
-import socks
-
-from os.path import getsize
-from urlparse import urlparse
-from urllib2 import _parse_proxy
-
-from helper import *
-
-class FTPBase(FTP):
- sourceAddress = ('', 0)
-
- def setSourceAddress(self, host):
- self.sourceAddress = (host, 0)
-
- def connect(self, host='', port=0, timeout=30, proxies={}):
- if host != '':
- self.host = host
- if port > 0:
- self.port = port
- self.timeout = timeout
-
- proxytype = None
- proxy = None
- if "socks5" in proxies:
- proxytype = socks.PROXY_TYPE_SOCKS5
- proxy = proxies["socks5"]
- elif "socks4" in proxies:
- proxytype = socks.PROXY_TYPE_SOCKS4
- proxy = proxies["socks4"]
- if proxytype:
- self.sock = socks.socksocket()
- t = _parse_proxy(proxy)
- self.sock.setproxy(proxytype, addr=t[3].split(":")[0], port=int(t[3].split(":")[1]), username=t[1], password=t[2])
- else:
- self.sock = socket.socket()
- self.sock.settimeout(self.timeout)
- self.sock.bind(self.sourceAddress)
- self.sock.connect((self.host, self.port))
- self.af = self.sock.family
- self.file = self.sock.makefile('rb')
- self.welcome = self.getresp()
- return self.welcome
-
-class WrappedFTPDeferred(WrappedDeferred):
- pass
-
-class FTPDownload():
- def __init__(self, url, filename, interface=None, bucket=None, proxies={}):
- self.url = url
- self.filename = filename
-
- self.bucket = bucket
- self.interface = interface
- self.proxies = proxies
-
- self.deferred = Deferred()
-
- self.finished = False
- self.size = None
-
- self.speed = 0
-
- self.abort = False
-
- self.arrived = 0
-
- self.startTime = None
- self.endTime = None
-
- self.speed = 0 #byte/sec
- self.speedCalcTime = None
- self.speedCalcLen = 0
-
- self.bufferSize = 16*1024 #tune if performance is poor
-
- self.ftp = FTPBase()
- self.fh = None
-
- @threaded
- def _download(self, offset):
- remotename = self.url.split("/")[-1]
- cmd = "RETR %s" % remotename
-
- self.startTime = inttime()
- self.arrived = offset
- conn, size = self.ftp.ntransfercmd(cmd, None if offset == 0 else offset) #explicit None
- if size:
- self.size = size + offset
- while True:
- if self.abort:
- self.ftp.abort()
- break
- count = self.bufferSize
- if self.bucket:
- count = self.bucket.add(count)
- if count == 0:
- sleep(0.01)
- continue
-
- try:
- data = conn.recv(count)
- except:
- self.deferred.error("timeout")
-
- if self.speedCalcTime < inttime():
- self.speed = self.speedCalcLen
- self.speedCalcTime = inttime()
- self.speedCalcLen = 0
- try:
- self.deferred.progress("percent", 100-int((self.size - self.arrived)/float(self.size)*100))
- except:
- pass
- size = len(data)
- self.speedCalcLen += size
- self.arrived += size
-
- if not data:
- break
-
- self.fh.write(data)
- self.fh.close()
- conn.close()
- self.endTime = inttime()
- if not self.abort:
- print self.ftp.voidresp() #debug
-
- self.ftp.quit()
- if self.abort:
- self.deferred.error("abort")
- elif self.size is None or self.size == self.arrived:
- self.deferred.callback()
- else:
- self.deferred.error("wrong content lenght")
-
- def download(self, resume=False):
- self.fh = open("%s.part" % self.filename, "ab" if resume else "wb")
- offset = 0
- if resume:
- offset = getsize("%s.part" % self.filename)
-
- up = urlparse(self.url)
-
- self.ftp.connect(up.hostname, up.port if up.port else 21, proxies=self.proxies)
- self.ftp.login(up.username, up.password)
- self.ftp.cwd("/".join(up.path.split("/")[:-1]))
- self.ftp.voidcmd('TYPE I')
- self.size = self.ftp.size(self.url.split("/")[-1])
-
- self._download(offset)
- return WrappedFTPDeferred(self, self.deferred)
-
-if __name__ == "__main__":
- import sys
- from Bucket import Bucket
- bucket = Bucket()
- bucket.setRate(200*1000)
- #bucket = None
-
- url = "ftp://mirror.sov.uk.goscomb.net/ubuntu-releases/maverick/ubuntu-10.10-desktop-i386.iso"
-
- finished = False
- def err(*a, **b):
- print a, b
- def callb(*a, **b):
- global finished
- finished = True
- print a, b
-
- print "starting"
-
- dwnld = FTPDownload(url, "ubuntu_ftp.iso")
- d = dwnld.download(resume=True)
- d.addCallback(callb)
- d.addErrback(err)
-
- try:
- while True:
- if not dwnld.finished:
- print dwnld.speed/1024, "kb/s", "size", dwnld.arrived, "/", dwnld.size#, int(float(dwnld.arrived)/dwnld.size*100), "%"
- if finished:
- print "- finished"
- break
- sleep(1)
- except KeyboardInterrupt:
- dwnld.abort = True
- sys.exit()
diff --git a/module/network/HTTPChunk.py b/module/network/HTTPChunk.py
index 440115c77..680b982d3 100644
--- a/module/network/HTTPChunk.py
+++ b/module/network/HTTPChunk.py
@@ -28,6 +28,7 @@ from HTTPRequest import HTTPRequest
class WrongFormat(Exception):
pass
+
class ChunkInfo():
def __init__(self, name):
self.name = name
@@ -115,6 +116,7 @@ class ChunkInfo():
def getChunkRange(self, index):
return self.chunks[index][1]
+
class HTTPChunk(HTTPRequest):
def __init__(self, id, parent, range=None, resume=False):
self.id = id
@@ -123,6 +125,7 @@ class HTTPChunk(HTTPRequest):
self.resume = resume
self.log = parent.log
+ self.size = range[1] - range[0] if range else -1
self.arrived = 0
self.lastURL = self.p.referer
@@ -136,11 +139,13 @@ class HTTPChunk(HTTPRequest):
self.initHandle()
self.setInterface(self.p.options["interface"], self.p.options["proxies"], self.p.options["ipv6"])
- self.BOMChecked = False
- # check and remove byte order mark
+ self.BOMChecked = False # check and remove byte order mark
self.rep = None
+ self.sleep = 0.000
+ self.lastSize = 0
+
@property
def cj(self):
return self.p.cj
@@ -218,11 +223,21 @@ class HTTPChunk(HTTPRequest):
if self.p.bucket:
sleep(self.p.bucket.consumed(size))
- elif size < 5000: #@TODO nice to have: traffic sharping algr. which calculates sleep time to reduce cpu load
- #sleep if chunk size gets low, to avoid many function calls and hope chunksize gets bigger
- sleep(0.007)
+ else:
+ # Avoid small buffers, increasing sleep time slowly if buffer size gets smaller
+ # otherwise reduce sleep time percentual (values are based on tests)
+ # So in general cpu time is saved without reducing bandwith too much
- if self.range and self.arrived > (self.range[1] - self.range[0]):
+ if size < self.lastSize:
+ self.sleep += 0.002
+ else:
+ self.sleep *= 0.7
+
+ self.lastSize = size
+
+ sleep(self.sleep)
+
+ if self.range and self.arrived > self.size:
return 0 #close if we have enough data
@@ -244,6 +259,10 @@ class HTTPChunk(HTTPRequest):
self.headerParsed = True
+ def setRange(self, range):
+ self.range = range
+ self.size = range[1] - range[0]
+
def close(self):
""" closes everything, unusable after this """
if self.fp: self.fp.close()
diff --git a/module/network/HTTPDownload.py b/module/network/HTTPDownload.py
index ac3252f68..f616b16b5 100644
--- a/module/network/HTTPDownload.py
+++ b/module/network/HTTPDownload.py
@@ -33,6 +33,7 @@ from module.utils import save_join
class HTTPDownload():
""" loads a url http + ftp """
+
def __init__(self, url, filename, get={}, post={}, referer=None, cj=None, bucket=None,
options={}, progressNotify=None, disposition=False):
self.url = url
@@ -69,7 +70,7 @@ class HTTPDownload():
self.lastArrived = []
self.speeds = []
self.lastSpeeds = [0, 0]
-
+
self.progressNotify = progressNotify
@property
@@ -93,7 +94,8 @@ class HTTPDownload():
fo = open(init, "rb+") #first chunkfile
for i in range(1, self.info.getCount()):
#input file
- fo.seek(self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks
+ fo.seek(
+ self.info.getChunkRange(i - 1)[1] + 1) #seek to beginning of chunk, to get rid of overlapping chunks
fname = "%s.chunk%d" % (self.filename, i)
fi = open(fname, "rb")
buf = 32 * 1024
@@ -113,7 +115,7 @@ class HTTPDownload():
if self.nameDisposition and self.disposition:
self.filename = save_join(dirname(self.filename), self.nameDisposition)
-
+
move(init, self.filename)
self.info.remove() #remove info file
@@ -164,7 +166,7 @@ class HTTPDownload():
chunksCreated = False
done = False
if self.info.getCount() > 1: # This is a resume, if we were chunked originally assume still can
- self.chunkSupport=True
+ self.chunkSupport = True
while 1:
#need to create chunks
@@ -177,7 +179,7 @@ class HTTPDownload():
chunks = self.info.getCount()
- init.range = self.info.getChunkRange(0)
+ init.setRange(self.info.getChunkRange(0))
for i in range(1, chunks):
c = HTTPChunk(i, self, self.info.getChunkRange(i), resume)
@@ -191,10 +193,8 @@ class HTTPDownload():
self.log.debug("Invalid curl handle -> closed")
c.close()
-
chunksCreated = True
-
while 1:
ret, num_handles = self.m.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
@@ -215,7 +215,7 @@ class HTTPDownload():
#@TODO KeyBoardInterrupts are seen as finished chunks,
#but normally not handled to this process, only in the testcase
-
+
chunksDone.add(curl)
if not num_q:
lastFinishCheck = t
@@ -243,7 +243,7 @@ class HTTPDownload():
if self.abort:
raise Abort()
- sleep(0.003) #supress busy waiting - limits dl speed to (1 / x) * buffersize
+ #sleep(0.003) #supress busy waiting - limits dl speed to (1 / x) * buffersize
self.m.select(1)
failed = False
@@ -261,11 +261,11 @@ class HTTPDownload():
if failed: raise BadHeader(failed)
self._copyChunks()
-
+
def updateProgress(self):
if self.progressNotify:
self.progressNotify(self.percent)
-
+
def close(self):
""" cleanup """
for chunk in self.chunks: