diff options
author | 2014-06-28 16:12:44 +0200 | |
---|---|---|
committer | 2014-06-28 20:23:59 +0200 | |
commit | 2c797aba90ec32fa979dce8c89789309f936ddce (patch) | |
tree | fd08def9e9e9cd9892e8b5743fa91be4ad45b26c /module/lib/thrift/server | |
parent | [Lib] Update simplejson to version 3.5.3 (diff) | |
download | pyload-2c797aba90ec32fa979dce8c89789309f936ddce.tar.xz |
[Lib] Update thrift to version 0.9.1
Diffstat (limited to 'module/lib/thrift/server')
-rw-r--r-- | module/lib/thrift/server/THttpServer.py | 21 | ||||
-rw-r--r-- | module/lib/thrift/server/TNonblockingServer.py | 120 | ||||
-rw-r--r-- | module/lib/thrift/server/TProcessPoolServer.py | 29 | ||||
-rw-r--r-- | module/lib/thrift/server/TServer.py | 37 |
4 files changed, 118 insertions, 89 deletions
diff --git a/module/lib/thrift/server/THttpServer.py b/module/lib/thrift/server/THttpServer.py index 3047d9c00..be54bab94 100644 --- a/module/lib/thrift/server/THttpServer.py +++ b/module/lib/thrift/server/THttpServer.py @@ -22,6 +22,7 @@ import BaseHTTPServer from thrift.server import TServer from thrift.transport import TTransport + class ResponseException(Exception): """Allows handlers to override the HTTP response @@ -39,16 +40,19 @@ class THttpServer(TServer.TServer): """A simple HTTP-based Thrift server This class is not very performant, but it is useful (for example) for - acting as a mock version of an Apache-based PHP Thrift endpoint.""" - - def __init__(self, processor, server_address, - inputProtocolFactory, outputProtocolFactory = None, - server_class = BaseHTTPServer.HTTPServer): + acting as a mock version of an Apache-based PHP Thrift endpoint. + """ + def __init__(self, + processor, + server_address, + inputProtocolFactory, + outputProtocolFactory=None, + server_class=BaseHTTPServer.HTTPServer): """Set up protocol factories and HTTP server. See BaseHTTPServer for server_address. - See TServer for protocol factories.""" - + See TServer for protocol factories. + """ if outputProtocolFactory is None: outputProtocolFactory = inputProtocolFactory @@ -62,7 +66,8 @@ class THttpServer(TServer.TServer): # Don't care about the request path. itrans = TTransport.TFileObjectTransport(self.rfile) otrans = TTransport.TFileObjectTransport(self.wfile) - itrans = TTransport.TBufferedTransport(itrans, int(self.headers['Content-Length'])) + itrans = TTransport.TBufferedTransport( + itrans, int(self.headers['Content-Length'])) otrans = TTransport.TMemoryBuffer() iprot = thttpserver.inputProtocolFactory.getProtocol(itrans) oprot = thttpserver.outputProtocolFactory.getProtocol(otrans) diff --git a/module/lib/thrift/server/TNonblockingServer.py b/module/lib/thrift/server/TNonblockingServer.py index ea348a0b6..fa478d01f 100644 --- a/module/lib/thrift/server/TNonblockingServer.py +++ b/module/lib/thrift/server/TNonblockingServer.py @@ -18,10 +18,11 @@ # """Implementation of non-blocking server. -The main idea of the server is reciving and sending requests -only from main thread. +The main idea of the server is to receive and send requests +only from the main thread. -It also makes thread pool server in tasks terms, not connections. +The thread poool should be sized for concurrent tasks, not +maximum connections """ import threading import socket @@ -35,8 +36,10 @@ from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory __all__ = ['TNonblockingServer'] + class Worker(threading.Thread): """Worker is a small helper to process incoming connection.""" + def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue @@ -60,8 +63,9 @@ WAIT_PROCESS = 2 SEND_ANSWER = 3 CLOSED = 4 + def locked(func): - "Decorator which locks self.lock." + """Decorator which locks self.lock.""" def nested(self, *args, **kwargs): self.lock.acquire() try: @@ -70,8 +74,9 @@ def locked(func): self.lock.release() return nested + def socket_exception(func): - "Decorator close object on socket.error." + """Decorator close object on socket.error.""" def read(self, *args, **kwargs): try: return func(self, *args, **kwargs) @@ -79,16 +84,17 @@ def socket_exception(func): self.close() return read + class Connection: """Basic class is represented connection. - + It can be in state: WAIT_LEN --- connection is reading request len. WAIT_MESSAGE --- connection is reading request. - WAIT_PROCESS --- connection has just read whole request and - waits for call ready routine. + WAIT_PROCESS --- connection has just read whole request and + waits for call ready routine. SEND_ANSWER --- connection is sending answer string (including length - of answer). + of answer). CLOSED --- socket was closed and connection should be deleted. """ def __init__(self, new_socket, wake_up): @@ -102,13 +108,13 @@ class Connection: def _read_len(self): """Reads length of request. - - It's really paranoic routine and it may be replaced by - self.socket.recv(4).""" + + It's a safer alternative to self.socket.recv(4) + """ read = self.socket.recv(4 - len(self.message)) if len(read) == 0: - # if we read 0 bytes and self.message is empty, it means client close - # connection + # if we read 0 bytes and self.message is empty, then + # the client closed the connection if len(self.message) != 0: logging.error("can't read frame size from socket") self.close() @@ -117,8 +123,8 @@ class Connection: if len(self.message) == 4: self.len, = struct.unpack('!i', self.message) if self.len < 0: - logging.error("negative frame size, it seems client"\ - " doesn't use FramedTransport") + logging.error("negative frame size, it seems client " + "doesn't use FramedTransport") self.close() elif self.len == 0: logging.error("empty frame, it's really strange") @@ -139,8 +145,8 @@ class Connection: elif self.status == WAIT_MESSAGE: read = self.socket.recv(self.len - len(self.message)) if len(read) == 0: - logging.error("can't read frame from socket (get %d of %d bytes)" % - (len(self.message), self.len)) + logging.error("can't read frame from socket (get %d of " + "%d bytes)" % (len(self.message), self.len)) self.close() return self.message += read @@ -162,14 +168,14 @@ class Connection: @locked def ready(self, all_ok, message): """Callback function for switching state and waking up main thread. - + This function is the only function witch can be called asynchronous. - + The ready can switch Connection to three states: WAIT_LEN if request was oneway. SEND_ANSWER if request was processed in normal way. CLOSED if request throws unexpected exception. - + The one wakes up main thread. """ assert self.status == WAIT_PROCESS @@ -189,33 +195,39 @@ class Connection: @locked def is_writeable(self): - "Returns True if connection should be added to write list of select." + """Return True if connection should be added to write list of select""" return self.status == SEND_ANSWER # it's not necessary, but... @locked def is_readable(self): - "Returns True if connection should be added to read list of select." + """Return True if connection should be added to read list of select""" return self.status in (WAIT_LEN, WAIT_MESSAGE) @locked def is_closed(self): - "Returns True if connection is closed." + """Returns True if connection is closed.""" return self.status == CLOSED def fileno(self): - "Returns the file descriptor of the associated socket." + """Returns the file descriptor of the associated socket.""" return self.socket.fileno() def close(self): - "Closes connection" + """Closes connection""" self.status = CLOSED self.socket.close() + class TNonblockingServer: """Non-blocking server.""" - def __init__(self, processor, lsocket, inputProtocolFactory=None, - outputProtocolFactory=None, threads=10): + + def __init__(self, + processor, + lsocket, + inputProtocolFactory=None, + outputProtocolFactory=None, + threads=10): self.processor = processor self.socket = lsocket self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() @@ -225,15 +237,18 @@ class TNonblockingServer: self.tasks = Queue.Queue() self._read, self._write = socket.socketpair() self.prepared = False + self._stop = False def setNumThreads(self, num): """Set the number of worker threads that should be created.""" # implement ThreadPool interface - assert not self.prepared, "You can't change number of threads for working server" + assert not self.prepared, "Can't change number of threads after start" self.threads = num def prepare(self): """Prepares server for serve requests.""" + if self.prepared: + return self.socket.listen() for _ in xrange(self.threads): thread = Worker(self.tasks) @@ -243,16 +258,32 @@ class TNonblockingServer: def wake_up(self): """Wake up main thread. - + The server usualy waits in select call in we should terminate one. The simplest way is using socketpair. - + Select always wait to read from the first socket of socketpair. - + In this case, we can just write anything to the second socket from - socketpair.""" + socketpair. + """ self._write.send('1') + def stop(self): + """Stop the server. + + This method causes the serve() method to return. stop() may be invoked + from within your handler, or from another thread. + + After stop() is called, serve() will return but the server will still + be listening on the socket. serve() may then be called again to resume + processing requests. Alternatively, close() may be called after + serve() returns to close the server socket and shutdown all worker + threads. + """ + self._stop = True + self.wake_up() + def _select(self): """Does select on open connections.""" readable = [self.socket.handle.fileno(), self._read.fileno()] @@ -265,21 +296,22 @@ class TNonblockingServer: if connection.is_closed(): del self.clients[i] return select.select(readable, writable, readable) - + def handle(self): """Handle requests. - - WARNING! You must call prepare BEFORE calling handle. + + WARNING! You must call prepare() BEFORE calling handle() """ assert self.prepared, "You have to call prepare before handle" rset, wset, xset = self._select() for readable in rset: if readable == self._read.fileno(): # don't care i just need to clean readable flag - self._read.recv(1024) + self._read.recv(1024) elif readable == self.socket.handle.fileno(): client = self.socket.accept().handle - self.clients[client.fileno()] = Connection(client, self.wake_up) + self.clients[client.fileno()] = Connection(client, + self.wake_up) else: connection = self.clients[readable] connection.read() @@ -288,7 +320,7 @@ class TNonblockingServer: otransport = TTransport.TMemoryBuffer() iprot = self.in_protocol.getProtocol(itransport) oprot = self.out_protocol.getProtocol(otransport) - self.tasks.put([self.processor, iprot, oprot, + self.tasks.put([self.processor, iprot, oprot, otransport, connection.ready]) for writeable in wset: self.clients[writeable].write() @@ -302,9 +334,13 @@ class TNonblockingServer: self.tasks.put([None, None, None, None, None]) self.socket.close() self.prepared = False - + def serve(self): - """Serve forever.""" + """Serve requests. + + Serve requests forever, or until stop() is called. + """ + self._stop = False self.prepare() - while True: + while not self._stop: self.handle() diff --git a/module/lib/thrift/server/TProcessPoolServer.py b/module/lib/thrift/server/TProcessPoolServer.py index 7ed814a88..7a695a883 100644 --- a/module/lib/thrift/server/TProcessPoolServer.py +++ b/module/lib/thrift/server/TProcessPoolServer.py @@ -24,15 +24,14 @@ from multiprocessing import Process, Value, Condition, reduction from TServer import TServer from thrift.transport.TTransport import TTransportException + class TProcessPoolServer(TServer): + """Server with a fixed size pool of worker subprocesses to service requests - """ - Server with a fixed size pool of worker subprocesses which service requests. Note that if you need shared state between the handlers - it's up to you! Written by Dvir Volk, doat.com """ - - def __init__(self, * args): + def __init__(self, *args): TServer.__init__(self, *args) self.numWorkers = 10 self.workers = [] @@ -50,12 +49,11 @@ class TProcessPoolServer(TServer): self.numWorkers = num def workerProcess(self): - """Loop around getting clients from the shared queue and process them.""" - + """Loop getting clients from the shared queue and process them""" if self.postForkCallback: self.postForkCallback() - while self.isRunning.value == True: + while self.isRunning.value: try: client = self.serverTransport.accept() self.serveClient(client) @@ -82,17 +80,15 @@ class TProcessPoolServer(TServer): itrans.close() otrans.close() - def serve(self): - """Start a fixed number of worker threads and put client into a queue""" - - #this is a shared state that can tell the workers to exit when set as false + """Start workers and put into queue""" + # this is a shared state that can tell the workers to exit when False self.isRunning.value = True - #first bind and listen to the port + # first bind and listen to the port self.serverTransport.listen() - #fork the children + # fork the children for i in range(self.numWorkers): try: w = Process(target=self.workerProcess) @@ -102,16 +98,14 @@ class TProcessPoolServer(TServer): except Exception, x: logging.exception(x) - #wait until the condition is set by stop() - + # wait until the condition is set by stop() while True: - self.stopCondition.acquire() try: self.stopCondition.wait() break except (SystemExit, KeyboardInterrupt): - break + break except Exception, x: logging.exception(x) @@ -122,4 +116,3 @@ class TProcessPoolServer(TServer): self.stopCondition.acquire() self.stopCondition.notify() self.stopCondition.release() - diff --git a/module/lib/thrift/server/TServer.py b/module/lib/thrift/server/TServer.py index 8456e2d40..2f24842c4 100644 --- a/module/lib/thrift/server/TServer.py +++ b/module/lib/thrift/server/TServer.py @@ -17,27 +17,28 @@ # under the License. # +import Queue import logging -import sys import os -import traceback +import sys import threading -import Queue +import traceback from thrift.Thrift import TProcessor -from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol +from thrift.transport import TTransport -class TServer: - """Base interface for a server, which must have a serve method.""" +class TServer: + """Base interface for a server, which must have a serve() method. - """ 3 constructors for all servers: + Three constructors for all servers: 1) (processor, serverTransport) 2) (processor, serverTransport, transportFactory, protocolFactory) 3) (processor, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory)""" + inputProtocolFactory, outputProtocolFactory) + """ def __init__(self, *args): if (len(args) == 2): self.__initArgs__(args[0], args[1], @@ -63,8 +64,8 @@ class TServer: def serve(self): pass -class TSimpleServer(TServer): +class TSimpleServer(TServer): """Simple single-threaded server that just pumps around one transport.""" def __init__(self, *args): @@ -89,8 +90,8 @@ class TSimpleServer(TServer): itrans.close() otrans.close() -class TThreadedServer(TServer): +class TThreadedServer(TServer): """Threaded server that spawns a new thread per each connection.""" def __init__(self, *args, **kwargs): @@ -102,7 +103,7 @@ class TThreadedServer(TServer): while True: try: client = self.serverTransport.accept() - t = threading.Thread(target = self.handle, args=(client,)) + t = threading.Thread(target=self.handle, args=(client,)) t.setDaemon(self.daemon) t.start() except KeyboardInterrupt: @@ -126,8 +127,8 @@ class TThreadedServer(TServer): itrans.close() otrans.close() -class TThreadPoolServer(TServer): +class TThreadPoolServer(TServer): """Server with a fixed size pool of threads which service requests.""" def __init__(self, *args, **kwargs): @@ -170,7 +171,7 @@ class TThreadPoolServer(TServer): """Start a fixed number of worker threads and put client into a queue""" for i in range(self.threads): try: - t = threading.Thread(target = self.serveThread) + t = threading.Thread(target=self.serveThread) t.setDaemon(self.daemon) t.start() except Exception, x: @@ -187,9 +188,8 @@ class TThreadPoolServer(TServer): class TForkingServer(TServer): + """A Thrift server that forks a new process for each request - """A Thrift server that forks a new process for each request""" - """ This is more scalable than the threaded server as it does not cause GIL contention. @@ -200,7 +200,6 @@ class TForkingServer(TServer): This code is heavily inspired by SocketServer.ForkingMixIn in the Python stdlib. """ - def __init__(self, *args): TServer.__init__(self, *args) self.children = [] @@ -212,14 +211,13 @@ class TForkingServer(TServer): except IOError, e: logging.warning(e, exc_info=True) - self.serverTransport.listen() while True: client = self.serverTransport.accept() try: pid = os.fork() - if pid: # parent + if pid: # parent # add before collect, otherwise you race w/ waitpid self.children.append(pid) self.collect_children() @@ -258,7 +256,6 @@ class TForkingServer(TServer): except Exception, x: logging.exception(x) - def collect_children(self): while self.children: try: @@ -270,5 +267,3 @@ class TForkingServer(TServer): self.children.remove(pid) else: break - - |