diff options
Diffstat (limited to 'module/lib/thrift/server/TNonblockingServer.py')
-rw-r--r-- | module/lib/thrift/server/TNonblockingServer.py | 120 |
1 files changed, 78 insertions, 42 deletions
diff --git a/module/lib/thrift/server/TNonblockingServer.py b/module/lib/thrift/server/TNonblockingServer.py index ea348a0b6..fa478d01f 100644 --- a/module/lib/thrift/server/TNonblockingServer.py +++ b/module/lib/thrift/server/TNonblockingServer.py @@ -18,10 +18,11 @@ # """Implementation of non-blocking server. -The main idea of the server is reciving and sending requests -only from main thread. +The main idea of the server is to receive and send requests +only from the main thread. -It also makes thread pool server in tasks terms, not connections. +The thread poool should be sized for concurrent tasks, not +maximum connections """ import threading import socket @@ -35,8 +36,10 @@ from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory __all__ = ['TNonblockingServer'] + class Worker(threading.Thread): """Worker is a small helper to process incoming connection.""" + def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue @@ -60,8 +63,9 @@ WAIT_PROCESS = 2 SEND_ANSWER = 3 CLOSED = 4 + def locked(func): - "Decorator which locks self.lock." + """Decorator which locks self.lock.""" def nested(self, *args, **kwargs): self.lock.acquire() try: @@ -70,8 +74,9 @@ def locked(func): self.lock.release() return nested + def socket_exception(func): - "Decorator close object on socket.error." + """Decorator close object on socket.error.""" def read(self, *args, **kwargs): try: return func(self, *args, **kwargs) @@ -79,16 +84,17 @@ def socket_exception(func): self.close() return read + class Connection: """Basic class is represented connection. - + It can be in state: WAIT_LEN --- connection is reading request len. WAIT_MESSAGE --- connection is reading request. - WAIT_PROCESS --- connection has just read whole request and - waits for call ready routine. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. SEND_ANSWER --- connection is sending answer string (including length - of answer). + of answer). CLOSED --- socket was closed and connection should be deleted. """ def __init__(self, new_socket, wake_up): @@ -102,13 +108,13 @@ class Connection: def _read_len(self): """Reads length of request. - - It's really paranoic routine and it may be replaced by - self.socket.recv(4).""" + + It's a safer alternative to self.socket.recv(4) + """ read = self.socket.recv(4 - len(self.message)) if len(read) == 0: - # if we read 0 bytes and self.message is empty, it means client close - # connection + # if we read 0 bytes and self.message is empty, then + # the client closed the connection if len(self.message) != 0: logging.error("can't read frame size from socket") self.close() @@ -117,8 +123,8 @@ class Connection: if len(self.message) == 4: self.len, = struct.unpack('!i', self.message) if self.len < 0: - logging.error("negative frame size, it seems client"\ - " doesn't use FramedTransport") + logging.error("negative frame size, it seems client " + "doesn't use FramedTransport") self.close() elif self.len == 0: logging.error("empty frame, it's really strange") @@ -139,8 +145,8 @@ class Connection: elif self.status == WAIT_MESSAGE: read = self.socket.recv(self.len - len(self.message)) if len(read) == 0: - logging.error("can't read frame from socket (get %d of %d bytes)" % - (len(self.message), self.len)) + logging.error("can't read frame from socket (get %d of " + "%d bytes)" % (len(self.message), self.len)) self.close() return self.message += read @@ -162,14 +168,14 @@ class Connection: @locked def ready(self, all_ok, message): """Callback function for switching state and waking up main thread. - + This function is the only function witch can be called asynchronous. - + The ready can switch Connection to three states: WAIT_LEN if request was oneway. SEND_ANSWER if request was processed in normal way. CLOSED if request throws unexpected exception. - + The one wakes up main thread. """ assert self.status == WAIT_PROCESS @@ -189,33 +195,39 @@ class Connection: @locked def is_writeable(self): - "Returns True if connection should be added to write list of select." + """Return True if connection should be added to write list of select""" return self.status == SEND_ANSWER # it's not necessary, but... @locked def is_readable(self): - "Returns True if connection should be added to read list of select." + """Return True if connection should be added to read list of select""" return self.status in (WAIT_LEN, WAIT_MESSAGE) @locked def is_closed(self): - "Returns True if connection is closed." + """Returns True if connection is closed.""" return self.status == CLOSED def fileno(self): - "Returns the file descriptor of the associated socket." + """Returns the file descriptor of the associated socket.""" return self.socket.fileno() def close(self): - "Closes connection" + """Closes connection""" self.status = CLOSED self.socket.close() + class TNonblockingServer: """Non-blocking server.""" - def __init__(self, processor, lsocket, inputProtocolFactory=None, - outputProtocolFactory=None, threads=10): + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): self.processor = processor self.socket = lsocket self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() @@ -225,15 +237,18 @@ class TNonblockingServer: self.tasks = Queue.Queue() self._read, self._write = socket.socketpair() self.prepared = False + self._stop = False def setNumThreads(self, num): """Set the number of worker threads that should be created.""" # implement ThreadPool interface - assert not self.prepared, "You can't change number of threads for working server" + assert not self.prepared, "Can't change number of threads after start" self.threads = num def prepare(self): """Prepares server for serve requests.""" + if self.prepared: + return self.socket.listen() for _ in xrange(self.threads): thread = Worker(self.tasks) @@ -243,16 +258,32 @@ class TNonblockingServer: def wake_up(self): """Wake up main thread. - + The server usualy waits in select call in we should terminate one. The simplest way is using socketpair. - + Select always wait to read from the first socket of socketpair. - + In this case, we can just write anything to the second socket from - socketpair.""" + socketpair. + """ self._write.send('1') + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + def _select(self): """Does select on open connections.""" readable = [self.socket.handle.fileno(), self._read.fileno()] @@ -265,21 +296,22 @@ class TNonblockingServer: if connection.is_closed(): del self.clients[i] return select.select(readable, writable, readable) - + def handle(self): """Handle requests. - - WARNING! You must call prepare BEFORE calling handle. + + WARNING! You must call prepare() BEFORE calling handle() """ assert self.prepared, "You have to call prepare before handle" rset, wset, xset = self._select() for readable in rset: if readable == self._read.fileno(): # don't care i just need to clean readable flag - self._read.recv(1024) + self._read.recv(1024) elif readable == self.socket.handle.fileno(): client = self.socket.accept().handle - self.clients[client.fileno()] = Connection(client, self.wake_up) + self.clients[client.fileno()] = Connection(client, + self.wake_up) else: connection = self.clients[readable] connection.read() @@ -288,7 +320,7 @@ class TNonblockingServer: otransport = TTransport.TMemoryBuffer() iprot = self.in_protocol.getProtocol(itransport) oprot = self.out_protocol.getProtocol(otransport) - self.tasks.put([self.processor, iprot, oprot, + self.tasks.put([self.processor, iprot, oprot, otransport, connection.ready]) for writeable in wset: self.clients[writeable].write() @@ -302,9 +334,13 @@ class TNonblockingServer: self.tasks.put([None, None, None, None, None]) self.socket.close() self.prepared = False - + def serve(self): - """Serve forever.""" + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False self.prepare() - while True: + while not self._stop: self.handle() |