summaryrefslogtreecommitdiffstats
path: root/module/lib/thrift/server/TNonblockingServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'module/lib/thrift/server/TNonblockingServer.py')
-rw-r--r--module/lib/thrift/server/TNonblockingServer.py120
1 files changed, 78 insertions, 42 deletions
diff --git a/module/lib/thrift/server/TNonblockingServer.py b/module/lib/thrift/server/TNonblockingServer.py
index ea348a0b6..fa478d01f 100644
--- a/module/lib/thrift/server/TNonblockingServer.py
+++ b/module/lib/thrift/server/TNonblockingServer.py
@@ -18,10 +18,11 @@
#
"""Implementation of non-blocking server.
-The main idea of the server is reciving and sending requests
-only from main thread.
+The main idea of the server is to receive and send requests
+only from the main thread.
-It also makes thread pool server in tasks terms, not connections.
+The thread poool should be sized for concurrent tasks, not
+maximum connections
"""
import threading
import socket
@@ -35,8 +36,10 @@ from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
__all__ = ['TNonblockingServer']
+
class Worker(threading.Thread):
"""Worker is a small helper to process incoming connection."""
+
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
@@ -60,8 +63,9 @@ WAIT_PROCESS = 2
SEND_ANSWER = 3
CLOSED = 4
+
def locked(func):
- "Decorator which locks self.lock."
+ """Decorator which locks self.lock."""
def nested(self, *args, **kwargs):
self.lock.acquire()
try:
@@ -70,8 +74,9 @@ def locked(func):
self.lock.release()
return nested
+
def socket_exception(func):
- "Decorator close object on socket.error."
+ """Decorator close object on socket.error."""
def read(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
@@ -79,16 +84,17 @@ def socket_exception(func):
self.close()
return read
+
class Connection:
"""Basic class is represented connection.
-
+
It can be in state:
WAIT_LEN --- connection is reading request len.
WAIT_MESSAGE --- connection is reading request.
- WAIT_PROCESS --- connection has just read whole request and
- waits for call ready routine.
+ WAIT_PROCESS --- connection has just read whole request and
+ waits for call ready routine.
SEND_ANSWER --- connection is sending answer string (including length
- of answer).
+ of answer).
CLOSED --- socket was closed and connection should be deleted.
"""
def __init__(self, new_socket, wake_up):
@@ -102,13 +108,13 @@ class Connection:
def _read_len(self):
"""Reads length of request.
-
- It's really paranoic routine and it may be replaced by
- self.socket.recv(4)."""
+
+ It's a safer alternative to self.socket.recv(4)
+ """
read = self.socket.recv(4 - len(self.message))
if len(read) == 0:
- # if we read 0 bytes and self.message is empty, it means client close
- # connection
+ # if we read 0 bytes and self.message is empty, then
+ # the client closed the connection
if len(self.message) != 0:
logging.error("can't read frame size from socket")
self.close()
@@ -117,8 +123,8 @@ class Connection:
if len(self.message) == 4:
self.len, = struct.unpack('!i', self.message)
if self.len < 0:
- logging.error("negative frame size, it seems client"\
- " doesn't use FramedTransport")
+ logging.error("negative frame size, it seems client "
+ "doesn't use FramedTransport")
self.close()
elif self.len == 0:
logging.error("empty frame, it's really strange")
@@ -139,8 +145,8 @@ class Connection:
elif self.status == WAIT_MESSAGE:
read = self.socket.recv(self.len - len(self.message))
if len(read) == 0:
- logging.error("can't read frame from socket (get %d of %d bytes)" %
- (len(self.message), self.len))
+ logging.error("can't read frame from socket (get %d of "
+ "%d bytes)" % (len(self.message), self.len))
self.close()
return
self.message += read
@@ -162,14 +168,14 @@ class Connection:
@locked
def ready(self, all_ok, message):
"""Callback function for switching state and waking up main thread.
-
+
This function is the only function witch can be called asynchronous.
-
+
The ready can switch Connection to three states:
WAIT_LEN if request was oneway.
SEND_ANSWER if request was processed in normal way.
CLOSED if request throws unexpected exception.
-
+
The one wakes up main thread.
"""
assert self.status == WAIT_PROCESS
@@ -189,33 +195,39 @@ class Connection:
@locked
def is_writeable(self):
- "Returns True if connection should be added to write list of select."
+ """Return True if connection should be added to write list of select"""
return self.status == SEND_ANSWER
# it's not necessary, but...
@locked
def is_readable(self):
- "Returns True if connection should be added to read list of select."
+ """Return True if connection should be added to read list of select"""
return self.status in (WAIT_LEN, WAIT_MESSAGE)
@locked
def is_closed(self):
- "Returns True if connection is closed."
+ """Returns True if connection is closed."""
return self.status == CLOSED
def fileno(self):
- "Returns the file descriptor of the associated socket."
+ """Returns the file descriptor of the associated socket."""
return self.socket.fileno()
def close(self):
- "Closes connection"
+ """Closes connection"""
self.status = CLOSED
self.socket.close()
+
class TNonblockingServer:
"""Non-blocking server."""
- def __init__(self, processor, lsocket, inputProtocolFactory=None,
- outputProtocolFactory=None, threads=10):
+
+ def __init__(self,
+ processor,
+ lsocket,
+ inputProtocolFactory=None,
+ outputProtocolFactory=None,
+ threads=10):
self.processor = processor
self.socket = lsocket
self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
@@ -225,15 +237,18 @@ class TNonblockingServer:
self.tasks = Queue.Queue()
self._read, self._write = socket.socketpair()
self.prepared = False
+ self._stop = False
def setNumThreads(self, num):
"""Set the number of worker threads that should be created."""
# implement ThreadPool interface
- assert not self.prepared, "You can't change number of threads for working server"
+ assert not self.prepared, "Can't change number of threads after start"
self.threads = num
def prepare(self):
"""Prepares server for serve requests."""
+ if self.prepared:
+ return
self.socket.listen()
for _ in xrange(self.threads):
thread = Worker(self.tasks)
@@ -243,16 +258,32 @@ class TNonblockingServer:
def wake_up(self):
"""Wake up main thread.
-
+
The server usualy waits in select call in we should terminate one.
The simplest way is using socketpair.
-
+
Select always wait to read from the first socket of socketpair.
-
+
In this case, we can just write anything to the second socket from
- socketpair."""
+ socketpair.
+ """
self._write.send('1')
+ def stop(self):
+ """Stop the server.
+
+ This method causes the serve() method to return. stop() may be invoked
+ from within your handler, or from another thread.
+
+ After stop() is called, serve() will return but the server will still
+ be listening on the socket. serve() may then be called again to resume
+ processing requests. Alternatively, close() may be called after
+ serve() returns to close the server socket and shutdown all worker
+ threads.
+ """
+ self._stop = True
+ self.wake_up()
+
def _select(self):
"""Does select on open connections."""
readable = [self.socket.handle.fileno(), self._read.fileno()]
@@ -265,21 +296,22 @@ class TNonblockingServer:
if connection.is_closed():
del self.clients[i]
return select.select(readable, writable, readable)
-
+
def handle(self):
"""Handle requests.
-
- WARNING! You must call prepare BEFORE calling handle.
+
+ WARNING! You must call prepare() BEFORE calling handle()
"""
assert self.prepared, "You have to call prepare before handle"
rset, wset, xset = self._select()
for readable in rset:
if readable == self._read.fileno():
# don't care i just need to clean readable flag
- self._read.recv(1024)
+ self._read.recv(1024)
elif readable == self.socket.handle.fileno():
client = self.socket.accept().handle
- self.clients[client.fileno()] = Connection(client, self.wake_up)
+ self.clients[client.fileno()] = Connection(client,
+ self.wake_up)
else:
connection = self.clients[readable]
connection.read()
@@ -288,7 +320,7 @@ class TNonblockingServer:
otransport = TTransport.TMemoryBuffer()
iprot = self.in_protocol.getProtocol(itransport)
oprot = self.out_protocol.getProtocol(otransport)
- self.tasks.put([self.processor, iprot, oprot,
+ self.tasks.put([self.processor, iprot, oprot,
otransport, connection.ready])
for writeable in wset:
self.clients[writeable].write()
@@ -302,9 +334,13 @@ class TNonblockingServer:
self.tasks.put([None, None, None, None, None])
self.socket.close()
self.prepared = False
-
+
def serve(self):
- """Serve forever."""
+ """Serve requests.
+
+ Serve requests forever, or until stop() is called.
+ """
+ self._stop = False
self.prepare()
- while True:
+ while not self._stop:
self.handle()