summaryrefslogtreecommitdiffstats
path: root/module/lib/thrift/server/TProcessPoolServer.py
diff options
context:
space:
mode:
Diffstat (limited to 'module/lib/thrift/server/TProcessPoolServer.py')
-rw-r--r--module/lib/thrift/server/TProcessPoolServer.py29
1 files changed, 11 insertions, 18 deletions
diff --git a/module/lib/thrift/server/TProcessPoolServer.py b/module/lib/thrift/server/TProcessPoolServer.py
index 7ed814a88..7a695a883 100644
--- a/module/lib/thrift/server/TProcessPoolServer.py
+++ b/module/lib/thrift/server/TProcessPoolServer.py
@@ -24,15 +24,14 @@ from multiprocessing import Process, Value, Condition, reduction
from TServer import TServer
from thrift.transport.TTransport import TTransportException
+
class TProcessPoolServer(TServer):
+ """Server with a fixed size pool of worker subprocesses to service requests
- """
- Server with a fixed size pool of worker subprocesses which service requests.
Note that if you need shared state between the handlers - it's up to you!
Written by Dvir Volk, doat.com
"""
-
- def __init__(self, * args):
+ def __init__(self, *args):
TServer.__init__(self, *args)
self.numWorkers = 10
self.workers = []
@@ -50,12 +49,11 @@ class TProcessPoolServer(TServer):
self.numWorkers = num
def workerProcess(self):
- """Loop around getting clients from the shared queue and process them."""
-
+ """Loop getting clients from the shared queue and process them"""
if self.postForkCallback:
self.postForkCallback()
- while self.isRunning.value == True:
+ while self.isRunning.value:
try:
client = self.serverTransport.accept()
self.serveClient(client)
@@ -82,17 +80,15 @@ class TProcessPoolServer(TServer):
itrans.close()
otrans.close()
-
def serve(self):
- """Start a fixed number of worker threads and put client into a queue"""
-
- #this is a shared state that can tell the workers to exit when set as false
+ """Start workers and put into queue"""
+ # this is a shared state that can tell the workers to exit when False
self.isRunning.value = True
- #first bind and listen to the port
+ # first bind and listen to the port
self.serverTransport.listen()
- #fork the children
+ # fork the children
for i in range(self.numWorkers):
try:
w = Process(target=self.workerProcess)
@@ -102,16 +98,14 @@ class TProcessPoolServer(TServer):
except Exception, x:
logging.exception(x)
- #wait until the condition is set by stop()
-
+ # wait until the condition is set by stop()
while True:
-
self.stopCondition.acquire()
try:
self.stopCondition.wait()
break
except (SystemExit, KeyboardInterrupt):
- break
+ break
except Exception, x:
logging.exception(x)
@@ -122,4 +116,3 @@ class TProcessPoolServer(TServer):
self.stopCondition.acquire()
self.stopCondition.notify()
self.stopCondition.release()
-