path: root/lib/thrift/server
diff options
authorGravatar Walter Purcaro <> 2015-04-07 22:22:18 +0200
committerGravatar Walter Purcaro <> 2015-04-07 22:22:18 +0200
commitd2fe85670726901da627490da4155af972c1a62e (patch)
treeb8931d070a51b6d8b1dabe881f54504f9d9ef6de /lib/thrift/server
parentUpdate user-agent (diff)
parentfix gui (diff)
Merge branch 'pr/n1_ardi69' into 0.4.10
Diffstat (limited to 'lib/thrift/server')
5 files changed, 811 insertions, 0 deletions
diff --git a/lib/thrift/server/ b/lib/thrift/server/
new file mode 100644
index 000000000..3047d9c00
--- /dev/null
+++ b/lib/thrift/server/
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import BaseHTTPServer
+from thrift.server import TServer
+from thrift.transport import TTransport
+class ResponseException(Exception):
+ """Allows handlers to override the HTTP response
+ Normally, THttpServer always sends a 200 response. If a handler wants
+ to override this behavior (e.g., to simulate a misconfigured or
+ overloaded web server during testing), it can raise a ResponseException.
+ The function passed to the constructor will be called with the
+ RequestHandler as its only argument.
+ """
+ def __init__(self, handler):
+ self.handler = handler
+class THttpServer(TServer.TServer):
+ """A simple HTTP-based Thrift server
+ This class is not very performant, but it is useful (for example) for
+ acting as a mock version of an Apache-based PHP Thrift endpoint."""
+ def __init__(self, processor, server_address,
+ inputProtocolFactory, outputProtocolFactory = None,
+ server_class = BaseHTTPServer.HTTPServer):
+ """Set up protocol factories and HTTP server.
+ See BaseHTTPServer for server_address.
+ See TServer for protocol factories."""
+ if outputProtocolFactory is None:
+ outputProtocolFactory = inputProtocolFactory
+ TServer.TServer.__init__(self, processor, None, None, None,
+ inputProtocolFactory, outputProtocolFactory)
+ thttpserver = self
+ class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ # Don't care about the request path.
+ itrans = TTransport.TFileObjectTransport(self.rfile)
+ otrans = TTransport.TFileObjectTransport(self.wfile)
+ itrans = TTransport.TBufferedTransport(itrans, int(self.headers['Content-Length']))
+ otrans = TTransport.TMemoryBuffer()
+ iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
+ oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
+ try:
+ thttpserver.processor.process(iprot, oprot)
+ except ResponseException, exn:
+ exn.handler(self)
+ else:
+ self.send_response(200)
+ self.send_header("content-type", "application/x-thrift")
+ self.end_headers()
+ self.wfile.write(otrans.getvalue())
+ self.httpd = server_class(server_address, RequestHander)
+ def serve(self):
+ self.httpd.serve_forever()
diff --git a/lib/thrift/server/ b/lib/thrift/server/
new file mode 100644
index 000000000..ea348a0b6
--- /dev/null
+++ b/lib/thrift/server/
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Implementation of non-blocking server.
+The main idea of the server is reciving and sending requests
+only from main thread.
+It also makes thread pool server in tasks terms, not connections.
+import threading
+import socket
+import Queue
+import select
+import struct
+import logging
+from thrift.transport import TTransport
+from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
+__all__ = ['TNonblockingServer']
+class Worker(threading.Thread):
+ """Worker is a small helper to process incoming connection."""
+ def __init__(self, queue):
+ threading.Thread.__init__(self)
+ self.queue = queue
+ def run(self):
+ """Process queries from task queue, stop if processor is None."""
+ while True:
+ try:
+ processor, iprot, oprot, otrans, callback = self.queue.get()
+ if processor is None:
+ break
+ processor.process(iprot, oprot)
+ callback(True, otrans.getvalue())
+ except Exception:
+ logging.exception("Exception while processing request")
+ callback(False, '')
+def locked(func):
+ "Decorator which locks self.lock."
+ def nested(self, *args, **kwargs):
+ self.lock.acquire()
+ try:
+ return func(self, *args, **kwargs)
+ finally:
+ self.lock.release()
+ return nested
+def socket_exception(func):
+ "Decorator close object on socket.error."
+ def read(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ except socket.error:
+ self.close()
+ return read
+class Connection:
+ """Basic class is represented connection.
+ It can be in state:
+ WAIT_LEN --- connection is reading request len.
+ WAIT_MESSAGE --- connection is reading request.
+ WAIT_PROCESS --- connection has just read whole request and
+ waits for call ready routine.
+ SEND_ANSWER --- connection is sending answer string (including length
+ of answer).
+ CLOSED --- socket was closed and connection should be deleted.
+ """
+ def __init__(self, new_socket, wake_up):
+ self.socket = new_socket
+ self.socket.setblocking(False)
+ self.status = WAIT_LEN
+ self.len = 0
+ self.message = ''
+ self.lock = threading.Lock()
+ self.wake_up = wake_up
+ def _read_len(self):
+ """Reads length of request.
+ It's really paranoic routine and it may be replaced by
+ self.socket.recv(4)."""
+ read = self.socket.recv(4 - len(self.message))
+ if len(read) == 0:
+ # if we read 0 bytes and self.message is empty, it means client close
+ # connection
+ if len(self.message) != 0:
+ logging.error("can't read frame size from socket")
+ self.close()
+ return
+ self.message += read
+ if len(self.message) == 4:
+ self.len, = struct.unpack('!i', self.message)
+ if self.len < 0:
+ logging.error("negative frame size, it seems client"\
+ " doesn't use FramedTransport")
+ self.close()
+ elif self.len == 0:
+ logging.error("empty frame, it's really strange")
+ self.close()
+ else:
+ self.message = ''
+ self.status = WAIT_MESSAGE
+ @socket_exception
+ def read(self):
+ """Reads data from stream and switch state."""
+ assert self.status in (WAIT_LEN, WAIT_MESSAGE)
+ if self.status == WAIT_LEN:
+ self._read_len()
+ # go back to the main loop here for simplicity instead of
+ # falling through, even though there is a good chance that
+ # the message is already available
+ elif self.status == WAIT_MESSAGE:
+ read = self.socket.recv(self.len - len(self.message))
+ if len(read) == 0:
+ logging.error("can't read frame from socket (get %d of %d bytes)" %
+ (len(self.message), self.len))
+ self.close()
+ return
+ self.message += read
+ if len(self.message) == self.len:
+ self.status = WAIT_PROCESS
+ @socket_exception
+ def write(self):
+ """Writes data from socket and switch state."""
+ assert self.status == SEND_ANSWER
+ sent = self.socket.send(self.message)
+ if sent == len(self.message):
+ self.status = WAIT_LEN
+ self.message = ''
+ self.len = 0
+ else:
+ self.message = self.message[sent:]
+ @locked
+ def ready(self, all_ok, message):
+ """Callback function for switching state and waking up main thread.
+ This function is the only function witch can be called asynchronous.
+ The ready can switch Connection to three states:
+ WAIT_LEN if request was oneway.
+ SEND_ANSWER if request was processed in normal way.
+ CLOSED if request throws unexpected exception.
+ The one wakes up main thread.
+ """
+ assert self.status == WAIT_PROCESS
+ if not all_ok:
+ self.close()
+ self.wake_up()
+ return
+ self.len = ''
+ if len(message) == 0:
+ # it was a oneway request, do not write answer
+ self.message = ''
+ self.status = WAIT_LEN
+ else:
+ self.message = struct.pack('!i', len(message)) + message
+ self.status = SEND_ANSWER
+ self.wake_up()
+ @locked
+ def is_writeable(self):
+ "Returns True if connection should be added to write list of select."
+ return self.status == SEND_ANSWER
+ # it's not necessary, but...
+ @locked
+ def is_readable(self):
+ "Returns True if connection should be added to read list of select."
+ return self.status in (WAIT_LEN, WAIT_MESSAGE)
+ @locked
+ def is_closed(self):
+ "Returns True if connection is closed."
+ return self.status == CLOSED
+ def fileno(self):
+ "Returns the file descriptor of the associated socket."
+ return self.socket.fileno()
+ def close(self):
+ "Closes connection"
+ self.status = CLOSED
+ self.socket.close()
+class TNonblockingServer:
+ """Non-blocking server."""
+ def __init__(self, processor, lsocket, inputProtocolFactory=None,
+ outputProtocolFactory=None, threads=10):
+ self.processor = processor
+ self.socket = lsocket
+ self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
+ self.out_protocol = outputProtocolFactory or self.in_protocol
+ self.threads = int(threads)
+ self.clients = {}
+ self.tasks = Queue.Queue()
+ self._read, self._write = socket.socketpair()
+ self.prepared = False
+ def setNumThreads(self, num):
+ """Set the number of worker threads that should be created."""
+ # implement ThreadPool interface
+ assert not self.prepared, "You can't change number of threads for working server"
+ self.threads = num
+ def prepare(self):
+ """Prepares server for serve requests."""
+ self.socket.listen()
+ for _ in xrange(self.threads):
+ thread = Worker(self.tasks)
+ thread.setDaemon(True)
+ thread.start()
+ self.prepared = True
+ def wake_up(self):
+ """Wake up main thread.
+ The server usualy waits in select call in we should terminate one.
+ The simplest way is using socketpair.
+ Select always wait to read from the first socket of socketpair.
+ In this case, we can just write anything to the second socket from
+ socketpair."""
+ self._write.send('1')
+ def _select(self):
+ """Does select on open connections."""
+ readable = [self.socket.handle.fileno(), self._read.fileno()]
+ writable = []
+ for i, connection in self.clients.items():
+ if connection.is_readable():
+ readable.append(connection.fileno())
+ if connection.is_writeable():
+ writable.append(connection.fileno())
+ if connection.is_closed():
+ del self.clients[i]
+ return, writable, readable)
+ def handle(self):
+ """Handle requests.
+ WARNING! You must call prepare BEFORE calling handle.
+ """
+ assert self.prepared, "You have to call prepare before handle"
+ rset, wset, xset = self._select()
+ for readable in rset:
+ if readable == self._read.fileno():
+ # don't care i just need to clean readable flag
+ self._read.recv(1024)
+ elif readable == self.socket.handle.fileno():
+ client = self.socket.accept().handle
+ self.clients[client.fileno()] = Connection(client, self.wake_up)
+ else:
+ connection = self.clients[readable]
+ if connection.status == WAIT_PROCESS:
+ itransport = TTransport.TMemoryBuffer(connection.message)
+ otransport = TTransport.TMemoryBuffer()
+ iprot = self.in_protocol.getProtocol(itransport)
+ oprot = self.out_protocol.getProtocol(otransport)
+ self.tasks.put([self.processor, iprot, oprot,
+ otransport, connection.ready])
+ for writeable in wset:
+ self.clients[writeable].write()
+ for oob in xset:
+ self.clients[oob].close()
+ del self.clients[oob]
+ def close(self):
+ """Closes the server."""
+ for _ in xrange(self.threads):
+ self.tasks.put([None, None, None, None, None])
+ self.socket.close()
+ self.prepared = False
+ def serve(self):
+ """Serve forever."""
+ self.prepare()
+ while True:
+ self.handle()
diff --git a/lib/thrift/server/ b/lib/thrift/server/
new file mode 100644
index 000000000..7ed814a88
--- /dev/null
+++ b/lib/thrift/server/
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from multiprocessing import Process, Value, Condition, reduction
+from TServer import TServer
+from thrift.transport.TTransport import TTransportException
+class TProcessPoolServer(TServer):
+ """
+ Server with a fixed size pool of worker subprocesses which service requests.
+ Note that if you need shared state between the handlers - it's up to you!
+ Written by Dvir Volk,
+ """
+ def __init__(self, * args):
+ TServer.__init__(self, *args)
+ self.numWorkers = 10
+ self.workers = []
+ self.isRunning = Value('b', False)
+ self.stopCondition = Condition()
+ self.postForkCallback = None
+ def setPostForkCallback(self, callback):
+ if not callable(callback):
+ raise TypeError("This is not a callback!")
+ self.postForkCallback = callback
+ def setNumWorkers(self, num):
+ """Set the number of worker threads that should be created"""
+ self.numWorkers = num
+ def workerProcess(self):
+ """Loop around getting clients from the shared queue and process them."""
+ if self.postForkCallback:
+ self.postForkCallback()
+ while self.isRunning.value == True:
+ try:
+ client = self.serverTransport.accept()
+ self.serveClient(client)
+ except (KeyboardInterrupt, SystemExit):
+ return 0
+ except Exception, x:
+ logging.exception(x)
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransportException, tx:
+ pass
+ except Exception, x:
+ logging.exception(x)
+ itrans.close()
+ otrans.close()
+ def serve(self):
+ """Start a fixed number of worker threads and put client into a queue"""
+ #this is a shared state that can tell the workers to exit when set as false
+ self.isRunning.value = True
+ #first bind and listen to the port
+ self.serverTransport.listen()
+ #fork the children
+ for i in range(self.numWorkers):
+ try:
+ w = Process(target=self.workerProcess)
+ w.daemon = True
+ w.start()
+ self.workers.append(w)
+ except Exception, x:
+ logging.exception(x)
+ #wait until the condition is set by stop()
+ while True:
+ self.stopCondition.acquire()
+ try:
+ self.stopCondition.wait()
+ break
+ except (SystemExit, KeyboardInterrupt):
+ break
+ except Exception, x:
+ logging.exception(x)
+ self.isRunning.value = False
+ def stop(self):
+ self.isRunning.value = False
+ self.stopCondition.acquire()
+ self.stopCondition.notify()
+ self.stopCondition.release()
diff --git a/lib/thrift/server/ b/lib/thrift/server/
new file mode 100644
index 000000000..8456e2d40
--- /dev/null
+++ b/lib/thrift/server/
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import sys
+import os
+import traceback
+import threading
+import Queue
+from thrift.Thrift import TProcessor
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+class TServer:
+ """Base interface for a server, which must have a serve method."""
+ """ 3 constructors for all servers:
+ 1) (processor, serverTransport)
+ 2) (processor, serverTransport, transportFactory, protocolFactory)
+ 3) (processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory)"""
+ def __init__(self, *args):
+ if (len(args) == 2):
+ self.__initArgs__(args[0], args[1],
+ TTransport.TTransportFactoryBase(),
+ TTransport.TTransportFactoryBase(),
+ TBinaryProtocol.TBinaryProtocolFactory(),
+ TBinaryProtocol.TBinaryProtocolFactory())
+ elif (len(args) == 4):
+ self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
+ elif (len(args) == 6):
+ self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
+ def __initArgs__(self, processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory):
+ self.processor = processor
+ self.serverTransport = serverTransport
+ self.inputTransportFactory = inputTransportFactory
+ self.outputTransportFactory = outputTransportFactory
+ self.inputProtocolFactory = inputProtocolFactory
+ self.outputProtocolFactory = outputProtocolFactory
+ def serve(self):
+ pass
+class TSimpleServer(TServer):
+ """Simple single-threaded server that just pumps around one transport."""
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ logging.exception(x)
+ itrans.close()
+ otrans.close()
+class TThreadedServer(TServer):
+ """Threaded server that spawns a new thread per each connection."""
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.daemon = kwargs.get("daemon", False)
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ t = threading.Thread(target = self.handle, args=(client,))
+ t.setDaemon(self.daemon)
+ t.start()
+ except KeyboardInterrupt:
+ raise
+ except Exception, x:
+ logging.exception(x)
+ def handle(self, client):
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ logging.exception(x)
+ itrans.close()
+ otrans.close()
+class TThreadPoolServer(TServer):
+ """Server with a fixed size pool of threads which service requests."""
+ def __init__(self, *args, **kwargs):
+ TServer.__init__(self, *args)
+ self.clients = Queue.Queue()
+ self.threads = 10
+ self.daemon = kwargs.get("daemon", False)
+ def setNumThreads(self, num):
+ """Set the number of worker threads that should be created"""
+ self.threads = num
+ def serveThread(self):
+ """Loop around getting clients from the shared queue and process them."""
+ while True:
+ try:
+ client = self.clients.get()
+ self.serveClient(client)
+ except Exception, x:
+ logging.exception(x)
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ logging.exception(x)
+ itrans.close()
+ otrans.close()
+ def serve(self):
+ """Start a fixed number of worker threads and put client into a queue"""
+ for i in range(self.threads):
+ try:
+ t = threading.Thread(target = self.serveThread)
+ t.setDaemon(self.daemon)
+ t.start()
+ except Exception, x:
+ logging.exception(x)
+ # Pump the socket for clients
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ self.clients.put(client)
+ except Exception, x:
+ logging.exception(x)
+class TForkingServer(TServer):
+ """A Thrift server that forks a new process for each request"""
+ """
+ This is more scalable than the threaded server as it does not cause
+ GIL contention.
+ Note that this has different semantics from the threading server.
+ Specifically, updates to shared variables will no longer be shared.
+ It will also not work on windows.
+ This code is heavily inspired by SocketServer.ForkingMixIn in the
+ Python stdlib.
+ """
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ self.children = []
+ def serve(self):
+ def try_close(file):
+ try:
+ file.close()
+ except IOError, e:
+ logging.warning(e, exc_info=True)
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ try:
+ pid = os.fork()
+ if pid: # parent
+ # add before collect, otherwise you race w/ waitpid
+ self.children.append(pid)
+ self.collect_children()
+ # Parent must close socket or the connection may not get
+ # closed promptly
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ try_close(itrans)
+ try_close(otrans)
+ else:
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+ ecode = 0
+ try:
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, e:
+ logging.exception(e)
+ ecode = 1
+ finally:
+ try_close(itrans)
+ try_close(otrans)
+ os._exit(ecode)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ logging.exception(x)
+ def collect_children(self):
+ while self.children:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except os.error:
+ pid = None
+ if pid:
+ self.children.remove(pid)
+ else:
+ break
diff --git a/lib/thrift/server/ b/lib/thrift/server/
new file mode 100644
index 000000000..1bf6e254e
--- /dev/null
+++ b/lib/thrift/server/
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+__all__ = ['TServer', 'TNonblockingServer']