summaryrefslogtreecommitdiffstats
path: root/module/lib/thrift/transport
diff options
context:
space:
mode:
authorGravatar mkaay <mkaay@mkaay.de> 2011-02-07 00:32:21 +0100
committerGravatar mkaay <mkaay@mkaay.de> 2011-02-07 00:32:21 +0100
commit69f2d3660e44bd138d24b6cf97b234bc2efc1c78 (patch)
treeaf276b8ed0b5cac5a62e6949a00d607afcc94e5d /module/lib/thrift/transport
parentclosed #231 (diff)
downloadpyload-69f2d3660e44bd138d24b6cf97b234bc2efc1c78.tar.xz
added Thrift backend
Diffstat (limited to 'module/lib/thrift/transport')
-rw-r--r--module/lib/thrift/transport/THttpClient.py126
-rw-r--r--module/lib/thrift/transport/TSocket.py163
-rw-r--r--module/lib/thrift/transport/TTransport.py331
-rw-r--r--module/lib/thrift/transport/TTwisted.py219
-rw-r--r--module/lib/thrift/transport/__init__.py20
5 files changed, 859 insertions, 0 deletions
diff --git a/module/lib/thrift/transport/THttpClient.py b/module/lib/thrift/transport/THttpClient.py
new file mode 100644
index 000000000..50269785c
--- /dev/null
+++ b/module/lib/thrift/transport/THttpClient.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from TTransport import *
+from cStringIO import StringIO
+
+import urlparse
+import httplib
+import warnings
+import socket
+
+class THttpClient(TTransportBase):
+
+ """Http implementation of TTransport base."""
+
+ def __init__(self, uri_or_host, port=None, path=None):
+ """THttpClient supports two different types constructor parameters.
+
+ THttpClient(host, port, path) - deprecated
+ THttpClient(uri)
+
+ Only the second supports https."""
+
+ if port is not None:
+ warnings.warn("Please use the THttpClient('http://host:port/path') syntax", DeprecationWarning, stacklevel=2)
+ self.host = uri_or_host
+ self.port = port
+ assert path
+ self.path = path
+ self.scheme = 'http'
+ else:
+ parsed = urlparse.urlparse(uri_or_host)
+ self.scheme = parsed.scheme
+ assert self.scheme in ('http', 'https')
+ if self.scheme == 'http':
+ self.port = parsed.port or httplib.HTTP_PORT
+ elif self.scheme == 'https':
+ self.port = parsed.port or httplib.HTTPS_PORT
+ self.host = parsed.hostname
+ self.path = parsed.path
+ if parsed.query:
+ self.path += '?%s' % parsed.query
+ self.__wbuf = StringIO()
+ self.__http = None
+ self.__timeout = None
+
+ def open(self):
+ if self.scheme == 'http':
+ self.__http = httplib.HTTP(self.host, self.port)
+ else:
+ self.__http = httplib.HTTPS(self.host, self.port)
+
+ def close(self):
+ self.__http.close()
+ self.__http = None
+
+ def isOpen(self):
+ return self.__http != None
+
+ def setTimeout(self, ms):
+ if not hasattr(socket, 'getdefaulttimeout'):
+ raise NotImplementedError
+
+ if ms is None:
+ self.__timeout = None
+ else:
+ self.__timeout = ms/1000.0
+
+ def read(self, sz):
+ return self.__http.file.read(sz)
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def __withTimeout(f):
+ def _f(*args, **kwargs):
+ orig_timeout = socket.getdefaulttimeout()
+ socket.setdefaulttimeout(args[0].__timeout)
+ result = f(*args, **kwargs)
+ socket.setdefaulttimeout(orig_timeout)
+ return result
+ return _f
+
+ def flush(self):
+ if self.isOpen():
+ self.close()
+ self.open();
+
+ # Pull data out of buffer
+ data = self.__wbuf.getvalue()
+ self.__wbuf = StringIO()
+
+ # HTTP request
+ self.__http.putrequest('POST', self.path)
+
+ # Write headers
+ self.__http.putheader('Host', self.host)
+ self.__http.putheader('Content-Type', 'application/x-thrift')
+ self.__http.putheader('Content-Length', str(len(data)))
+ self.__http.endheaders()
+
+ # Write payload
+ self.__http.send(data)
+
+ # Get reply to flush the request
+ self.code, self.message, self.headers = self.__http.getreply()
+
+ # Decorate if we know how to timeout
+ if hasattr(socket, 'getdefaulttimeout'):
+ flush = __withTimeout(flush)
diff --git a/module/lib/thrift/transport/TSocket.py b/module/lib/thrift/transport/TSocket.py
new file mode 100644
index 000000000..d77e358a2
--- /dev/null
+++ b/module/lib/thrift/transport/TSocket.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from TTransport import *
+import os
+import errno
+import socket
+import sys
+
+class TSocketBase(TTransportBase):
+ def _resolveAddr(self):
+ if self._unix_socket is not None:
+ return [(socket.AF_UNIX, socket.SOCK_STREAM, None, None, self._unix_socket)]
+ else:
+ return socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE | socket.AI_ADDRCONFIG)
+
+ def close(self):
+ if self.handle:
+ self.handle.close()
+ self.handle = None
+
+class TSocket(TSocketBase):
+ """Socket implementation of TTransport base."""
+
+ def __init__(self, host='localhost', port=9090, unix_socket=None):
+ """Initialize a TSocket
+
+ @param host(str) The host to connect to.
+ @param port(int) The (TCP) port to connect to.
+ @param unix_socket(str) The filename of a unix socket to connect to.
+ (host and port will be ignored.)
+ """
+
+ self.host = host
+ self.port = port
+ self.handle = None
+ self._unix_socket = unix_socket
+ self._timeout = None
+
+ def setHandle(self, h):
+ self.handle = h
+
+ def isOpen(self):
+ return self.handle != None
+
+ def setTimeout(self, ms):
+ if ms is None:
+ self._timeout = None
+ else:
+ self._timeout = ms/1000.0
+
+ if (self.handle != None):
+ self.handle.settimeout(self._timeout)
+
+ def open(self):
+ try:
+ res0 = self._resolveAddr()
+ for res in res0:
+ self.handle = socket.socket(res[0], res[1])
+ self.handle.settimeout(self._timeout)
+ try:
+ self.handle.connect(res[4])
+ except socket.error, e:
+ if res is not res0[-1]:
+ continue
+ else:
+ raise e
+ break
+ except socket.error, e:
+ if self._unix_socket:
+ message = 'Could not connect to socket %s' % self._unix_socket
+ else:
+ message = 'Could not connect to %s:%d' % (self.host, self.port)
+ raise TTransportException(type=TTransportException.NOT_OPEN, message=message)
+
+ def read(self, sz):
+ try:
+ buff = self.handle.recv(sz)
+ except socket.error, e:
+ if (e.args[0] == errno.ECONNRESET and
+ (sys.platform == 'darwin' or sys.platform.startswith('freebsd'))):
+ # freebsd and Mach don't follow POSIX semantic of recv
+ # and fail with ECONNRESET if peer performed shutdown.
+ # See corresponding comment and code in TSocket::read()
+ # in lib/cpp/src/transport/TSocket.cpp.
+ self.close()
+ # Trigger the check to raise the END_OF_FILE exception below.
+ buff = ''
+ else:
+ raise
+ if len(buff) == 0:
+ raise TTransportException(type=TTransportException.END_OF_FILE, message='TSocket read 0 bytes')
+ return buff
+
+ def write(self, buff):
+ if not self.handle:
+ raise TTransportException(type=TTransportException.NOT_OPEN, message='Transport not open')
+ sent = 0
+ have = len(buff)
+ while sent < have:
+ plus = self.handle.send(buff)
+ if plus == 0:
+ raise TTransportException(type=TTransportException.END_OF_FILE, message='TSocket sent 0 bytes')
+ sent += plus
+ buff = buff[plus:]
+
+ def flush(self):
+ pass
+
+class TServerSocket(TSocketBase, TServerTransportBase):
+ """Socket implementation of TServerTransport base."""
+
+ def __init__(self, port=9090, unix_socket=None):
+ self.host = None
+ self.port = port
+ self._unix_socket = unix_socket
+ self.handle = None
+
+ def listen(self):
+ res0 = self._resolveAddr()
+ for res in res0:
+ if res[0] is socket.AF_INET6 or res is res0[-1]:
+ break
+
+ # We need remove the old unix socket if the file exists and
+ # nobody is listening on it.
+ if self._unix_socket:
+ tmp = socket.socket(res[0], res[1])
+ try:
+ tmp.connect(res[4])
+ except socket.error, err:
+ eno, message = err.args
+ if eno == errno.ECONNREFUSED:
+ os.unlink(res[4])
+
+ self.handle = socket.socket(res[0], res[1])
+ self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if hasattr(self.handle, 'set_timeout'):
+ self.handle.set_timeout(None)
+ self.handle.bind(res[4])
+ self.handle.listen(128)
+
+ def accept(self):
+ client, addr = self.handle.accept()
+ result = TSocket()
+ result.setHandle(client)
+ return result
diff --git a/module/lib/thrift/transport/TTransport.py b/module/lib/thrift/transport/TTransport.py
new file mode 100644
index 000000000..12e51a9bf
--- /dev/null
+++ b/module/lib/thrift/transport/TTransport.py
@@ -0,0 +1,331 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from cStringIO import StringIO
+from struct import pack,unpack
+from thrift.Thrift import TException
+
+class TTransportException(TException):
+
+ """Custom Transport Exception class"""
+
+ UNKNOWN = 0
+ NOT_OPEN = 1
+ ALREADY_OPEN = 2
+ TIMED_OUT = 3
+ END_OF_FILE = 4
+
+ def __init__(self, type=UNKNOWN, message=None):
+ TException.__init__(self, message)
+ self.type = type
+
+class TTransportBase:
+
+ """Base class for Thrift transport layer."""
+
+ def isOpen(self):
+ pass
+
+ def open(self):
+ pass
+
+ def close(self):
+ pass
+
+ def read(self, sz):
+ pass
+
+ def readAll(self, sz):
+ buff = ''
+ have = 0
+ while (have < sz):
+ chunk = self.read(sz-have)
+ have += len(chunk)
+ buff += chunk
+
+ if len(chunk) == 0:
+ raise EOFError()
+
+ return buff
+
+ def write(self, buf):
+ pass
+
+ def flush(self):
+ pass
+
+# This class should be thought of as an interface.
+class CReadableTransport:
+ """base class for transports that are readable from C"""
+
+ # TODO(dreiss): Think about changing this interface to allow us to use
+ # a (Python, not c) StringIO instead, because it allows
+ # you to write after reading.
+
+ # NOTE: This is a classic class, so properties will NOT work
+ # correctly for setting.
+ @property
+ def cstringio_buf(self):
+ """A cStringIO buffer that contains the current chunk we are reading."""
+ pass
+
+ def cstringio_refill(self, partialread, reqlen):
+ """Refills cstringio_buf.
+
+ Returns the currently used buffer (which can but need not be the same as
+ the old cstringio_buf). partialread is what the C code has read from the
+ buffer, and should be inserted into the buffer before any more reads. The
+ return value must be a new, not borrowed reference. Something along the
+ lines of self._buf should be fine.
+
+ If reqlen bytes can't be read, throw EOFError.
+ """
+ pass
+
+class TServerTransportBase:
+
+ """Base class for Thrift server transports."""
+
+ def listen(self):
+ pass
+
+ def accept(self):
+ pass
+
+ def close(self):
+ pass
+
+class TTransportFactoryBase:
+
+ """Base class for a Transport Factory"""
+
+ def getTransport(self, trans):
+ return trans
+
+class TBufferedTransportFactory:
+
+ """Factory transport that builds buffered transports"""
+
+ def getTransport(self, trans):
+ buffered = TBufferedTransport(trans)
+ return buffered
+
+
+class TBufferedTransport(TTransportBase,CReadableTransport):
+
+ """Class that wraps another transport and buffers its I/O.
+
+ The implementation uses a (configurable) fixed-size read buffer
+ but buffers all writes until a flush is performed.
+ """
+
+ DEFAULT_BUFFER = 4096
+
+ def __init__(self, trans, rbuf_size = DEFAULT_BUFFER):
+ self.__trans = trans
+ self.__wbuf = StringIO()
+ self.__rbuf = StringIO("")
+ self.__rbuf_size = rbuf_size
+
+ def isOpen(self):
+ return self.__trans.isOpen()
+
+ def open(self):
+ return self.__trans.open()
+
+ def close(self):
+ return self.__trans.close()
+
+ def read(self, sz):
+ ret = self.__rbuf.read(sz)
+ if len(ret) != 0:
+ return ret
+
+ self.__rbuf = StringIO(self.__trans.read(max(sz, self.__rbuf_size)))
+ return self.__rbuf.read(sz)
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ out = self.__wbuf.getvalue()
+ # reset wbuf before write/flush to preserve state on underlying failure
+ self.__wbuf = StringIO()
+ self.__trans.write(out)
+ self.__trans.flush()
+
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self.__rbuf
+
+ def cstringio_refill(self, partialread, reqlen):
+ retstring = partialread
+ if reqlen < self.__rbuf_size:
+ # try to make a read of as much as we can.
+ retstring += self.__trans.read(self.__rbuf_size)
+
+ # but make sure we do read reqlen bytes.
+ if len(retstring) < reqlen:
+ retstring += self.__trans.readAll(reqlen - len(retstring))
+
+ self.__rbuf = StringIO(retstring)
+ return self.__rbuf
+
+class TMemoryBuffer(TTransportBase, CReadableTransport):
+ """Wraps a cStringIO object as a TTransport.
+
+ NOTE: Unlike the C++ version of this class, you cannot write to it
+ then immediately read from it. If you want to read from a
+ TMemoryBuffer, you must either pass a string to the constructor.
+ TODO(dreiss): Make this work like the C++ version.
+ """
+
+ def __init__(self, value=None):
+ """value -- a value to read from for stringio
+
+ If value is set, this will be a transport for reading,
+ otherwise, it is for writing"""
+ if value is not None:
+ self._buffer = StringIO(value)
+ else:
+ self._buffer = StringIO()
+
+ def isOpen(self):
+ return not self._buffer.closed
+
+ def open(self):
+ pass
+
+ def close(self):
+ self._buffer.close()
+
+ def read(self, sz):
+ return self._buffer.read(sz)
+
+ def write(self, buf):
+ self._buffer.write(buf)
+
+ def flush(self):
+ pass
+
+ def getvalue(self):
+ return self._buffer.getvalue()
+
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self._buffer
+
+ def cstringio_refill(self, partialread, reqlen):
+ # only one shot at reading...
+ raise EOFError()
+
+class TFramedTransportFactory:
+
+ """Factory transport that builds framed transports"""
+
+ def getTransport(self, trans):
+ framed = TFramedTransport(trans)
+ return framed
+
+
+class TFramedTransport(TTransportBase, CReadableTransport):
+
+ """Class that wraps another transport and frames its I/O when writing."""
+
+ def __init__(self, trans,):
+ self.__trans = trans
+ self.__rbuf = StringIO()
+ self.__wbuf = StringIO()
+
+ def isOpen(self):
+ return self.__trans.isOpen()
+
+ def open(self):
+ return self.__trans.open()
+
+ def close(self):
+ return self.__trans.close()
+
+ def read(self, sz):
+ ret = self.__rbuf.read(sz)
+ if len(ret) != 0:
+ return ret
+
+ self.readFrame()
+ return self.__rbuf.read(sz)
+
+ def readFrame(self):
+ buff = self.__trans.readAll(4)
+ sz, = unpack('!i', buff)
+ self.__rbuf = StringIO(self.__trans.readAll(sz))
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ wout = self.__wbuf.getvalue()
+ wsz = len(wout)
+ # reset wbuf before write/flush to preserve state on underlying failure
+ self.__wbuf = StringIO()
+ # N.B.: Doing this string concatenation is WAY cheaper than making
+ # two separate calls to the underlying socket object. Socket writes in
+ # Python turn out to be REALLY expensive, but it seems to do a pretty
+ # good job of managing string buffer operations without excessive copies
+ buf = pack("!i", wsz) + wout
+ self.__trans.write(buf)
+ self.__trans.flush()
+
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self.__rbuf
+
+ def cstringio_refill(self, prefix, reqlen):
+ # self.__rbuf will already be empty here because fastbinary doesn't
+ # ask for a refill until the previous buffer is empty. Therefore,
+ # we can start reading new frames immediately.
+ while len(prefix) < reqlen:
+ self.readFrame()
+ prefix += self.__rbuf.getvalue()
+ self.__rbuf = StringIO(prefix)
+ return self.__rbuf
+
+
+class TFileObjectTransport(TTransportBase):
+ """Wraps a file-like object to make it work as a Thrift transport."""
+
+ def __init__(self, fileobj):
+ self.fileobj = fileobj
+
+ def isOpen(self):
+ return True
+
+ def close(self):
+ self.fileobj.close()
+
+ def read(self, sz):
+ return self.fileobj.read(sz)
+
+ def write(self, buf):
+ self.fileobj.write(buf)
+
+ def flush(self):
+ self.fileobj.flush()
diff --git a/module/lib/thrift/transport/TTwisted.py b/module/lib/thrift/transport/TTwisted.py
new file mode 100644
index 000000000..b6dcb4e0b
--- /dev/null
+++ b/module/lib/thrift/transport/TTwisted.py
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from zope.interface import implements, Interface, Attribute
+from twisted.internet.protocol import Protocol, ServerFactory, ClientFactory, \
+ connectionDone
+from twisted.internet import defer
+from twisted.protocols import basic
+from twisted.python import log
+from twisted.web import server, resource, http
+
+from thrift.transport import TTransport
+from cStringIO import StringIO
+
+
+class TMessageSenderTransport(TTransport.TTransportBase):
+
+ def __init__(self):
+ self.__wbuf = StringIO()
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ msg = self.__wbuf.getvalue()
+ self.__wbuf = StringIO()
+ self.sendMessage(msg)
+
+ def sendMessage(self, message):
+ raise NotImplementedError
+
+
+class TCallbackTransport(TMessageSenderTransport):
+
+ def __init__(self, func):
+ TMessageSenderTransport.__init__(self)
+ self.func = func
+
+ def sendMessage(self, message):
+ self.func(message)
+
+
+class ThriftClientProtocol(basic.Int32StringReceiver):
+
+ MAX_LENGTH = 2 ** 31 - 1
+
+ def __init__(self, client_class, iprot_factory, oprot_factory=None):
+ self._client_class = client_class
+ self._iprot_factory = iprot_factory
+ if oprot_factory is None:
+ self._oprot_factory = iprot_factory
+ else:
+ self._oprot_factory = oprot_factory
+
+ self.recv_map = {}
+ self.started = defer.Deferred()
+
+ def dispatch(self, msg):
+ self.sendString(msg)
+
+ def connectionMade(self):
+ tmo = TCallbackTransport(self.dispatch)
+ self.client = self._client_class(tmo, self._oprot_factory)
+ self.started.callback(self.client)
+
+ def connectionLost(self, reason=connectionDone):
+ for k,v in self.client._reqs.iteritems():
+ tex = TTransport.TTransportException(
+ type=TTransport.TTransportException.END_OF_FILE,
+ message='Connection closed')
+ v.errback(tex)
+
+ def stringReceived(self, frame):
+ tr = TTransport.TMemoryBuffer(frame)
+ iprot = self._iprot_factory.getProtocol(tr)
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+
+ try:
+ method = self.recv_map[fname]
+ except KeyError:
+ method = getattr(self.client, 'recv_' + fname)
+ self.recv_map[fname] = method
+
+ method(iprot, mtype, rseqid)
+
+
+class ThriftServerProtocol(basic.Int32StringReceiver):
+
+ MAX_LENGTH = 2 ** 31 - 1
+
+ def dispatch(self, msg):
+ self.sendString(msg)
+
+ def processError(self, error):
+ self.transport.loseConnection()
+
+ def processOk(self, _, tmo):
+ msg = tmo.getvalue()
+
+ if len(msg) > 0:
+ self.dispatch(msg)
+
+ def stringReceived(self, frame):
+ tmi = TTransport.TMemoryBuffer(frame)
+ tmo = TTransport.TMemoryBuffer()
+
+ iprot = self.factory.iprot_factory.getProtocol(tmi)
+ oprot = self.factory.oprot_factory.getProtocol(tmo)
+
+ d = self.factory.processor.process(iprot, oprot)
+ d.addCallbacks(self.processOk, self.processError,
+ callbackArgs=(tmo,))
+
+
+class IThriftServerFactory(Interface):
+
+ processor = Attribute("Thrift processor")
+
+ iprot_factory = Attribute("Input protocol factory")
+
+ oprot_factory = Attribute("Output protocol factory")
+
+
+class IThriftClientFactory(Interface):
+
+ client_class = Attribute("Thrift client class")
+
+ iprot_factory = Attribute("Input protocol factory")
+
+ oprot_factory = Attribute("Output protocol factory")
+
+
+class ThriftServerFactory(ServerFactory):
+
+ implements(IThriftServerFactory)
+
+ protocol = ThriftServerProtocol
+
+ def __init__(self, processor, iprot_factory, oprot_factory=None):
+ self.processor = processor
+ self.iprot_factory = iprot_factory
+ if oprot_factory is None:
+ self.oprot_factory = iprot_factory
+ else:
+ self.oprot_factory = oprot_factory
+
+
+class ThriftClientFactory(ClientFactory):
+
+ implements(IThriftClientFactory)
+
+ protocol = ThriftClientProtocol
+
+ def __init__(self, client_class, iprot_factory, oprot_factory=None):
+ self.client_class = client_class
+ self.iprot_factory = iprot_factory
+ if oprot_factory is None:
+ self.oprot_factory = iprot_factory
+ else:
+ self.oprot_factory = oprot_factory
+
+ def buildProtocol(self, addr):
+ p = self.protocol(self.client_class, self.iprot_factory,
+ self.oprot_factory)
+ p.factory = self
+ return p
+
+
+class ThriftResource(resource.Resource):
+
+ allowedMethods = ('POST',)
+
+ def __init__(self, processor, inputProtocolFactory,
+ outputProtocolFactory=None):
+ resource.Resource.__init__(self)
+ self.inputProtocolFactory = inputProtocolFactory
+ if outputProtocolFactory is None:
+ self.outputProtocolFactory = inputProtocolFactory
+ else:
+ self.outputProtocolFactory = outputProtocolFactory
+ self.processor = processor
+
+ def getChild(self, path, request):
+ return self
+
+ def _cbProcess(self, _, request, tmo):
+ msg = tmo.getvalue()
+ request.setResponseCode(http.OK)
+ request.setHeader("content-type", "application/x-thrift")
+ request.write(msg)
+ request.finish()
+
+ def render_POST(self, request):
+ request.content.seek(0, 0)
+ data = request.content.read()
+ tmi = TTransport.TMemoryBuffer(data)
+ tmo = TTransport.TMemoryBuffer()
+
+ iprot = self.inputProtocolFactory.getProtocol(tmi)
+ oprot = self.outputProtocolFactory.getProtocol(tmo)
+
+ d = self.processor.process(iprot, oprot)
+ d.addCallback(self._cbProcess, request, tmo)
+ return server.NOT_DONE_YET
diff --git a/module/lib/thrift/transport/__init__.py b/module/lib/thrift/transport/__init__.py
new file mode 100644
index 000000000..02c6048a9
--- /dev/null
+++ b/module/lib/thrift/transport/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+__all__ = ['TTransport', 'TSocket', 'THttpClient']