diff options
Diffstat (limited to 'module/lib/thrift/server')
-rw-r--r-- | module/lib/thrift/server/THttpServer.py | 87 | ||||
-rw-r--r-- | module/lib/thrift/server/TNonblockingServer.py | 346 | ||||
-rw-r--r-- | module/lib/thrift/server/TProcessPoolServer.py | 118 | ||||
-rw-r--r-- | module/lib/thrift/server/TServer.py | 269 | ||||
-rw-r--r-- | module/lib/thrift/server/__init__.py | 20 |
5 files changed, 0 insertions, 840 deletions
diff --git a/module/lib/thrift/server/THttpServer.py b/module/lib/thrift/server/THttpServer.py deleted file mode 100644 index be54bab94..000000000 --- a/module/lib/thrift/server/THttpServer.py +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import BaseHTTPServer - -from thrift.server import TServer -from thrift.transport import TTransport - - -class ResponseException(Exception): - """Allows handlers to override the HTTP response - - Normally, THttpServer always sends a 200 response. If a handler wants - to override this behavior (e.g., to simulate a misconfigured or - overloaded web server during testing), it can raise a ResponseException. - The function passed to the constructor will be called with the - RequestHandler as its only argument. - """ - def __init__(self, handler): - self.handler = handler - - -class THttpServer(TServer.TServer): - """A simple HTTP-based Thrift server - - This class is not very performant, but it is useful (for example) for - acting as a mock version of an Apache-based PHP Thrift endpoint. - """ - def __init__(self, - processor, - server_address, - inputProtocolFactory, - outputProtocolFactory=None, - server_class=BaseHTTPServer.HTTPServer): - """Set up protocol factories and HTTP server. - - See BaseHTTPServer for server_address. - See TServer for protocol factories. - """ - if outputProtocolFactory is None: - outputProtocolFactory = inputProtocolFactory - - TServer.TServer.__init__(self, processor, None, None, None, - inputProtocolFactory, outputProtocolFactory) - - thttpserver = self - - class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler): - def do_POST(self): - # Don't care about the request path. - itrans = TTransport.TFileObjectTransport(self.rfile) - otrans = TTransport.TFileObjectTransport(self.wfile) - itrans = TTransport.TBufferedTransport( - itrans, int(self.headers['Content-Length'])) - otrans = TTransport.TMemoryBuffer() - iprot = thttpserver.inputProtocolFactory.getProtocol(itrans) - oprot = thttpserver.outputProtocolFactory.getProtocol(otrans) - try: - thttpserver.processor.process(iprot, oprot) - except ResponseException, exn: - exn.handler(self) - else: - self.send_response(200) - self.send_header("content-type", "application/x-thrift") - self.end_headers() - self.wfile.write(otrans.getvalue()) - - self.httpd = server_class(server_address, RequestHander) - - def serve(self): - self.httpd.serve_forever() diff --git a/module/lib/thrift/server/TNonblockingServer.py b/module/lib/thrift/server/TNonblockingServer.py deleted file mode 100644 index fa478d01f..000000000 --- a/module/lib/thrift/server/TNonblockingServer.py +++ /dev/null @@ -1,346 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -"""Implementation of non-blocking server. - -The main idea of the server is to receive and send requests -only from the main thread. - -The thread poool should be sized for concurrent tasks, not -maximum connections -""" -import threading -import socket -import Queue -import select -import struct -import logging - -from thrift.transport import TTransport -from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory - -__all__ = ['TNonblockingServer'] - - -class Worker(threading.Thread): - """Worker is a small helper to process incoming connection.""" - - def __init__(self, queue): - threading.Thread.__init__(self) - self.queue = queue - - def run(self): - """Process queries from task queue, stop if processor is None.""" - while True: - try: - processor, iprot, oprot, otrans, callback = self.queue.get() - if processor is None: - break - processor.process(iprot, oprot) - callback(True, otrans.getvalue()) - except Exception: - logging.exception("Exception while processing request") - callback(False, '') - -WAIT_LEN = 0 -WAIT_MESSAGE = 1 -WAIT_PROCESS = 2 -SEND_ANSWER = 3 -CLOSED = 4 - - -def locked(func): - """Decorator which locks self.lock.""" - def nested(self, *args, **kwargs): - self.lock.acquire() - try: - return func(self, *args, **kwargs) - finally: - self.lock.release() - return nested - - -def socket_exception(func): - """Decorator close object on socket.error.""" - def read(self, *args, **kwargs): - try: - return func(self, *args, **kwargs) - except socket.error: - self.close() - return read - - -class Connection: - """Basic class is represented connection. - - It can be in state: - WAIT_LEN --- connection is reading request len. - WAIT_MESSAGE --- connection is reading request. - WAIT_PROCESS --- connection has just read whole request and - waits for call ready routine. - SEND_ANSWER --- connection is sending answer string (including length - of answer). - CLOSED --- socket was closed and connection should be deleted. - """ - def __init__(self, new_socket, wake_up): - self.socket = new_socket - self.socket.setblocking(False) - self.status = WAIT_LEN - self.len = 0 - self.message = '' - self.lock = threading.Lock() - self.wake_up = wake_up - - def _read_len(self): - """Reads length of request. - - It's a safer alternative to self.socket.recv(4) - """ - read = self.socket.recv(4 - len(self.message)) - if len(read) == 0: - # if we read 0 bytes and self.message is empty, then - # the client closed the connection - if len(self.message) != 0: - logging.error("can't read frame size from socket") - self.close() - return - self.message += read - if len(self.message) == 4: - self.len, = struct.unpack('!i', self.message) - if self.len < 0: - logging.error("negative frame size, it seems client " - "doesn't use FramedTransport") - self.close() - elif self.len == 0: - logging.error("empty frame, it's really strange") - self.close() - else: - self.message = '' - self.status = WAIT_MESSAGE - - @socket_exception - def read(self): - """Reads data from stream and switch state.""" - assert self.status in (WAIT_LEN, WAIT_MESSAGE) - if self.status == WAIT_LEN: - self._read_len() - # go back to the main loop here for simplicity instead of - # falling through, even though there is a good chance that - # the message is already available - elif self.status == WAIT_MESSAGE: - read = self.socket.recv(self.len - len(self.message)) - if len(read) == 0: - logging.error("can't read frame from socket (get %d of " - "%d bytes)" % (len(self.message), self.len)) - self.close() - return - self.message += read - if len(self.message) == self.len: - self.status = WAIT_PROCESS - - @socket_exception - def write(self): - """Writes data from socket and switch state.""" - assert self.status == SEND_ANSWER - sent = self.socket.send(self.message) - if sent == len(self.message): - self.status = WAIT_LEN - self.message = '' - self.len = 0 - else: - self.message = self.message[sent:] - - @locked - def ready(self, all_ok, message): - """Callback function for switching state and waking up main thread. - - This function is the only function witch can be called asynchronous. - - The ready can switch Connection to three states: - WAIT_LEN if request was oneway. - SEND_ANSWER if request was processed in normal way. - CLOSED if request throws unexpected exception. - - The one wakes up main thread. - """ - assert self.status == WAIT_PROCESS - if not all_ok: - self.close() - self.wake_up() - return - self.len = '' - if len(message) == 0: - # it was a oneway request, do not write answer - self.message = '' - self.status = WAIT_LEN - else: - self.message = struct.pack('!i', len(message)) + message - self.status = SEND_ANSWER - self.wake_up() - - @locked - def is_writeable(self): - """Return True if connection should be added to write list of select""" - return self.status == SEND_ANSWER - - # it's not necessary, but... - @locked - def is_readable(self): - """Return True if connection should be added to read list of select""" - return self.status in (WAIT_LEN, WAIT_MESSAGE) - - @locked - def is_closed(self): - """Returns True if connection is closed.""" - return self.status == CLOSED - - def fileno(self): - """Returns the file descriptor of the associated socket.""" - return self.socket.fileno() - - def close(self): - """Closes connection""" - self.status = CLOSED - self.socket.close() - - -class TNonblockingServer: - """Non-blocking server.""" - - def __init__(self, - processor, - lsocket, - inputProtocolFactory=None, - outputProtocolFactory=None, - threads=10): - self.processor = processor - self.socket = lsocket - self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory() - self.out_protocol = outputProtocolFactory or self.in_protocol - self.threads = int(threads) - self.clients = {} - self.tasks = Queue.Queue() - self._read, self._write = socket.socketpair() - self.prepared = False - self._stop = False - - def setNumThreads(self, num): - """Set the number of worker threads that should be created.""" - # implement ThreadPool interface - assert not self.prepared, "Can't change number of threads after start" - self.threads = num - - def prepare(self): - """Prepares server for serve requests.""" - if self.prepared: - return - self.socket.listen() - for _ in xrange(self.threads): - thread = Worker(self.tasks) - thread.setDaemon(True) - thread.start() - self.prepared = True - - def wake_up(self): - """Wake up main thread. - - The server usualy waits in select call in we should terminate one. - The simplest way is using socketpair. - - Select always wait to read from the first socket of socketpair. - - In this case, we can just write anything to the second socket from - socketpair. - """ - self._write.send('1') - - def stop(self): - """Stop the server. - - This method causes the serve() method to return. stop() may be invoked - from within your handler, or from another thread. - - After stop() is called, serve() will return but the server will still - be listening on the socket. serve() may then be called again to resume - processing requests. Alternatively, close() may be called after - serve() returns to close the server socket and shutdown all worker - threads. - """ - self._stop = True - self.wake_up() - - def _select(self): - """Does select on open connections.""" - readable = [self.socket.handle.fileno(), self._read.fileno()] - writable = [] - for i, connection in self.clients.items(): - if connection.is_readable(): - readable.append(connection.fileno()) - if connection.is_writeable(): - writable.append(connection.fileno()) - if connection.is_closed(): - del self.clients[i] - return select.select(readable, writable, readable) - - def handle(self): - """Handle requests. - - WARNING! You must call prepare() BEFORE calling handle() - """ - assert self.prepared, "You have to call prepare before handle" - rset, wset, xset = self._select() - for readable in rset: - if readable == self._read.fileno(): - # don't care i just need to clean readable flag - self._read.recv(1024) - elif readable == self.socket.handle.fileno(): - client = self.socket.accept().handle - self.clients[client.fileno()] = Connection(client, - self.wake_up) - else: - connection = self.clients[readable] - connection.read() - if connection.status == WAIT_PROCESS: - itransport = TTransport.TMemoryBuffer(connection.message) - otransport = TTransport.TMemoryBuffer() - iprot = self.in_protocol.getProtocol(itransport) - oprot = self.out_protocol.getProtocol(otransport) - self.tasks.put([self.processor, iprot, oprot, - otransport, connection.ready]) - for writeable in wset: - self.clients[writeable].write() - for oob in xset: - self.clients[oob].close() - del self.clients[oob] - - def close(self): - """Closes the server.""" - for _ in xrange(self.threads): - self.tasks.put([None, None, None, None, None]) - self.socket.close() - self.prepared = False - - def serve(self): - """Serve requests. - - Serve requests forever, or until stop() is called. - """ - self._stop = False - self.prepare() - while not self._stop: - self.handle() diff --git a/module/lib/thrift/server/TProcessPoolServer.py b/module/lib/thrift/server/TProcessPoolServer.py deleted file mode 100644 index 7a695a883..000000000 --- a/module/lib/thrift/server/TProcessPoolServer.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -import logging -from multiprocessing import Process, Value, Condition, reduction - -from TServer import TServer -from thrift.transport.TTransport import TTransportException - - -class TProcessPoolServer(TServer): - """Server with a fixed size pool of worker subprocesses to service requests - - Note that if you need shared state between the handlers - it's up to you! - Written by Dvir Volk, doat.com - """ - def __init__(self, *args): - TServer.__init__(self, *args) - self.numWorkers = 10 - self.workers = [] - self.isRunning = Value('b', False) - self.stopCondition = Condition() - self.postForkCallback = None - - def setPostForkCallback(self, callback): - if not callable(callback): - raise TypeError("This is not a callback!") - self.postForkCallback = callback - - def setNumWorkers(self, num): - """Set the number of worker threads that should be created""" - self.numWorkers = num - - def workerProcess(self): - """Loop getting clients from the shared queue and process them""" - if self.postForkCallback: - self.postForkCallback() - - while self.isRunning.value: - try: - client = self.serverTransport.accept() - self.serveClient(client) - except (KeyboardInterrupt, SystemExit): - return 0 - except Exception, x: - logging.exception(x) - - def serveClient(self, client): - """Process input/output from a client for as long as possible""" - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - - try: - while True: - self.processor.process(iprot, oprot) - except TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - def serve(self): - """Start workers and put into queue""" - # this is a shared state that can tell the workers to exit when False - self.isRunning.value = True - - # first bind and listen to the port - self.serverTransport.listen() - - # fork the children - for i in range(self.numWorkers): - try: - w = Process(target=self.workerProcess) - w.daemon = True - w.start() - self.workers.append(w) - except Exception, x: - logging.exception(x) - - # wait until the condition is set by stop() - while True: - self.stopCondition.acquire() - try: - self.stopCondition.wait() - break - except (SystemExit, KeyboardInterrupt): - break - except Exception, x: - logging.exception(x) - - self.isRunning.value = False - - def stop(self): - self.isRunning.value = False - self.stopCondition.acquire() - self.stopCondition.notify() - self.stopCondition.release() diff --git a/module/lib/thrift/server/TServer.py b/module/lib/thrift/server/TServer.py deleted file mode 100644 index 2f24842c4..000000000 --- a/module/lib/thrift/server/TServer.py +++ /dev/null @@ -1,269 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import Queue -import logging -import os -import sys -import threading -import traceback - -from thrift.Thrift import TProcessor -from thrift.protocol import TBinaryProtocol -from thrift.transport import TTransport - - -class TServer: - """Base interface for a server, which must have a serve() method. - - Three constructors for all servers: - 1) (processor, serverTransport) - 2) (processor, serverTransport, transportFactory, protocolFactory) - 3) (processor, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) - """ - def __init__(self, *args): - if (len(args) == 2): - self.__initArgs__(args[0], args[1], - TTransport.TTransportFactoryBase(), - TTransport.TTransportFactoryBase(), - TBinaryProtocol.TBinaryProtocolFactory(), - TBinaryProtocol.TBinaryProtocolFactory()) - elif (len(args) == 4): - self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3]) - elif (len(args) == 6): - self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5]) - - def __initArgs__(self, processor, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory): - self.processor = processor - self.serverTransport = serverTransport - self.inputTransportFactory = inputTransportFactory - self.outputTransportFactory = outputTransportFactory - self.inputProtocolFactory = inputProtocolFactory - self.outputProtocolFactory = outputProtocolFactory - - def serve(self): - pass - - -class TSimpleServer(TServer): - """Simple single-threaded server that just pumps around one transport.""" - - def __init__(self, *args): - TServer.__init__(self, *args) - - def serve(self): - self.serverTransport.listen() - while True: - client = self.serverTransport.accept() - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - -class TThreadedServer(TServer): - """Threaded server that spawns a new thread per each connection.""" - - def __init__(self, *args, **kwargs): - TServer.__init__(self, *args) - self.daemon = kwargs.get("daemon", False) - - def serve(self): - self.serverTransport.listen() - while True: - try: - client = self.serverTransport.accept() - t = threading.Thread(target=self.handle, args=(client,)) - t.setDaemon(self.daemon) - t.start() - except KeyboardInterrupt: - raise - except Exception, x: - logging.exception(x) - - def handle(self, client): - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - -class TThreadPoolServer(TServer): - """Server with a fixed size pool of threads which service requests.""" - - def __init__(self, *args, **kwargs): - TServer.__init__(self, *args) - self.clients = Queue.Queue() - self.threads = 10 - self.daemon = kwargs.get("daemon", False) - - def setNumThreads(self, num): - """Set the number of worker threads that should be created""" - self.threads = num - - def serveThread(self): - """Loop around getting clients from the shared queue and process them.""" - while True: - try: - client = self.clients.get() - self.serveClient(client) - except Exception, x: - logging.exception(x) - - def serveClient(self, client): - """Process input/output from a client for as long as possible""" - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - itrans.close() - otrans.close() - - def serve(self): - """Start a fixed number of worker threads and put client into a queue""" - for i in range(self.threads): - try: - t = threading.Thread(target=self.serveThread) - t.setDaemon(self.daemon) - t.start() - except Exception, x: - logging.exception(x) - - # Pump the socket for clients - self.serverTransport.listen() - while True: - try: - client = self.serverTransport.accept() - self.clients.put(client) - except Exception, x: - logging.exception(x) - - -class TForkingServer(TServer): - """A Thrift server that forks a new process for each request - - This is more scalable than the threaded server as it does not cause - GIL contention. - - Note that this has different semantics from the threading server. - Specifically, updates to shared variables will no longer be shared. - It will also not work on windows. - - This code is heavily inspired by SocketServer.ForkingMixIn in the - Python stdlib. - """ - def __init__(self, *args): - TServer.__init__(self, *args) - self.children = [] - - def serve(self): - def try_close(file): - try: - file.close() - except IOError, e: - logging.warning(e, exc_info=True) - - self.serverTransport.listen() - while True: - client = self.serverTransport.accept() - try: - pid = os.fork() - - if pid: # parent - # add before collect, otherwise you race w/ waitpid - self.children.append(pid) - self.collect_children() - - # Parent must close socket or the connection may not get - # closed promptly - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - try_close(itrans) - try_close(otrans) - else: - itrans = self.inputTransportFactory.getTransport(client) - otrans = self.outputTransportFactory.getTransport(client) - - iprot = self.inputProtocolFactory.getProtocol(itrans) - oprot = self.outputProtocolFactory.getProtocol(otrans) - - ecode = 0 - try: - try: - while True: - self.processor.process(iprot, oprot) - except TTransport.TTransportException, tx: - pass - except Exception, e: - logging.exception(e) - ecode = 1 - finally: - try_close(itrans) - try_close(otrans) - - os._exit(ecode) - - except TTransport.TTransportException, tx: - pass - except Exception, x: - logging.exception(x) - - def collect_children(self): - while self.children: - try: - pid, status = os.waitpid(0, os.WNOHANG) - except os.error: - pid = None - - if pid: - self.children.remove(pid) - else: - break diff --git a/module/lib/thrift/server/__init__.py b/module/lib/thrift/server/__init__.py deleted file mode 100644 index 1bf6e254e..000000000 --- a/module/lib/thrift/server/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -__all__ = ['TServer', 'TNonblockingServer'] |