diff options
Diffstat (limited to 'module/lib/thrift/server/TProcessPoolServer.py')
-rw-r--r-- | module/lib/thrift/server/TProcessPoolServer.py | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/module/lib/thrift/server/TProcessPoolServer.py b/module/lib/thrift/server/TProcessPoolServer.py index 7ed814a88..7a695a883 100644 --- a/module/lib/thrift/server/TProcessPoolServer.py +++ b/module/lib/thrift/server/TProcessPoolServer.py @@ -24,15 +24,14 @@ from multiprocessing import Process, Value, Condition, reduction from TServer import TServer from thrift.transport.TTransport import TTransportException + class TProcessPoolServer(TServer): + """Server with a fixed size pool of worker subprocesses to service requests - """ - Server with a fixed size pool of worker subprocesses which service requests. Note that if you need shared state between the handlers - it's up to you! Written by Dvir Volk, doat.com """ - - def __init__(self, * args): + def __init__(self, *args): TServer.__init__(self, *args) self.numWorkers = 10 self.workers = [] @@ -50,12 +49,11 @@ class TProcessPoolServer(TServer): self.numWorkers = num def workerProcess(self): - """Loop around getting clients from the shared queue and process them.""" - + """Loop getting clients from the shared queue and process them""" if self.postForkCallback: self.postForkCallback() - while self.isRunning.value == True: + while self.isRunning.value: try: client = self.serverTransport.accept() self.serveClient(client) @@ -82,17 +80,15 @@ class TProcessPoolServer(TServer): itrans.close() otrans.close() - def serve(self): - """Start a fixed number of worker threads and put client into a queue""" - - #this is a shared state that can tell the workers to exit when set as false + """Start workers and put into queue""" + # this is a shared state that can tell the workers to exit when False self.isRunning.value = True - #first bind and listen to the port + # first bind and listen to the port self.serverTransport.listen() - #fork the children + # fork the children for i in range(self.numWorkers): try: w = Process(target=self.workerProcess) @@ -102,16 +98,14 @@ class TProcessPoolServer(TServer): except Exception, x: logging.exception(x) - #wait until the condition is set by stop() - + # wait until the condition is set by stop() while True: - self.stopCondition.acquire() try: self.stopCondition.wait() break except (SystemExit, KeyboardInterrupt): - break + break except Exception, x: logging.exception(x) @@ -122,4 +116,3 @@ class TProcessPoolServer(TServer): self.stopCondition.acquire() self.stopCondition.notify() self.stopCondition.release() - |