path: root/module/lib/thrift/transport/
diff options
Diffstat (limited to 'module/lib/thrift/transport/')
1 files changed, 331 insertions, 0 deletions
diff --git a/module/lib/thrift/transport/ b/module/lib/thrift/transport/
new file mode 100644
index 000000000..12e51a9bf
--- /dev/null
+++ b/module/lib/thrift/transport/
@@ -0,0 +1,331 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from cStringIO import StringIO
+from struct import pack,unpack
+from thrift.Thrift import TException
+class TTransportException(TException):
+ """Custom Transport Exception class"""
+ NOT_OPEN = 1
+ def __init__(self, type=UNKNOWN, message=None):
+ TException.__init__(self, message)
+ self.type = type
+class TTransportBase:
+ """Base class for Thrift transport layer."""
+ def isOpen(self):
+ pass
+ def open(self):
+ pass
+ def close(self):
+ pass
+ def read(self, sz):
+ pass
+ def readAll(self, sz):
+ buff = ''
+ have = 0
+ while (have < sz):
+ chunk =
+ have += len(chunk)
+ buff += chunk
+ if len(chunk) == 0:
+ raise EOFError()
+ return buff
+ def write(self, buf):
+ pass
+ def flush(self):
+ pass
+# This class should be thought of as an interface.
+class CReadableTransport:
+ """base class for transports that are readable from C"""
+ # TODO(dreiss): Think about changing this interface to allow us to use
+ # a (Python, not c) StringIO instead, because it allows
+ # you to write after reading.
+ # NOTE: This is a classic class, so properties will NOT work
+ # correctly for setting.
+ @property
+ def cstringio_buf(self):
+ """A cStringIO buffer that contains the current chunk we are reading."""
+ pass
+ def cstringio_refill(self, partialread, reqlen):
+ """Refills cstringio_buf.
+ Returns the currently used buffer (which can but need not be the same as
+ the old cstringio_buf). partialread is what the C code has read from the
+ buffer, and should be inserted into the buffer before any more reads. The
+ return value must be a new, not borrowed reference. Something along the
+ lines of self._buf should be fine.
+ If reqlen bytes can't be read, throw EOFError.
+ """
+ pass
+class TServerTransportBase:
+ """Base class for Thrift server transports."""
+ def listen(self):
+ pass
+ def accept(self):
+ pass
+ def close(self):
+ pass
+class TTransportFactoryBase:
+ """Base class for a Transport Factory"""
+ def getTransport(self, trans):
+ return trans
+class TBufferedTransportFactory:
+ """Factory transport that builds buffered transports"""
+ def getTransport(self, trans):
+ buffered = TBufferedTransport(trans)
+ return buffered
+class TBufferedTransport(TTransportBase,CReadableTransport):
+ """Class that wraps another transport and buffers its I/O.
+ The implementation uses a (configurable) fixed-size read buffer
+ but buffers all writes until a flush is performed.
+ """
+ def __init__(self, trans, rbuf_size = DEFAULT_BUFFER):
+ self.__trans = trans
+ self.__wbuf = StringIO()
+ self.__rbuf = StringIO("")
+ self.__rbuf_size = rbuf_size
+ def isOpen(self):
+ return self.__trans.isOpen()
+ def open(self):
+ return
+ def close(self):
+ return self.__trans.close()
+ def read(self, sz):
+ ret =
+ if len(ret) != 0:
+ return ret
+ self.__rbuf = StringIO(, self.__rbuf_size)))
+ return
+ def write(self, buf):
+ self.__wbuf.write(buf)
+ def flush(self):
+ out = self.__wbuf.getvalue()
+ # reset wbuf before write/flush to preserve state on underlying failure
+ self.__wbuf = StringIO()
+ self.__trans.write(out)
+ self.__trans.flush()
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self.__rbuf
+ def cstringio_refill(self, partialread, reqlen):
+ retstring = partialread
+ if reqlen < self.__rbuf_size:
+ # try to make a read of as much as we can.
+ retstring +=
+ # but make sure we do read reqlen bytes.
+ if len(retstring) < reqlen:
+ retstring += self.__trans.readAll(reqlen - len(retstring))
+ self.__rbuf = StringIO(retstring)
+ return self.__rbuf
+class TMemoryBuffer(TTransportBase, CReadableTransport):
+ """Wraps a cStringIO object as a TTransport.
+ NOTE: Unlike the C++ version of this class, you cannot write to it
+ then immediately read from it. If you want to read from a
+ TMemoryBuffer, you must either pass a string to the constructor.
+ TODO(dreiss): Make this work like the C++ version.
+ """
+ def __init__(self, value=None):
+ """value -- a value to read from for stringio
+ If value is set, this will be a transport for reading,
+ otherwise, it is for writing"""
+ if value is not None:
+ self._buffer = StringIO(value)
+ else:
+ self._buffer = StringIO()
+ def isOpen(self):
+ return not self._buffer.closed
+ def open(self):
+ pass
+ def close(self):
+ self._buffer.close()
+ def read(self, sz):
+ return
+ def write(self, buf):
+ self._buffer.write(buf)
+ def flush(self):
+ pass
+ def getvalue(self):
+ return self._buffer.getvalue()
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self._buffer
+ def cstringio_refill(self, partialread, reqlen):
+ # only one shot at reading...
+ raise EOFError()
+class TFramedTransportFactory:
+ """Factory transport that builds framed transports"""
+ def getTransport(self, trans):
+ framed = TFramedTransport(trans)
+ return framed
+class TFramedTransport(TTransportBase, CReadableTransport):
+ """Class that wraps another transport and frames its I/O when writing."""
+ def __init__(self, trans,):
+ self.__trans = trans
+ self.__rbuf = StringIO()
+ self.__wbuf = StringIO()
+ def isOpen(self):
+ return self.__trans.isOpen()
+ def open(self):
+ return
+ def close(self):
+ return self.__trans.close()
+ def read(self, sz):
+ ret =
+ if len(ret) != 0:
+ return ret
+ self.readFrame()
+ return
+ def readFrame(self):
+ buff = self.__trans.readAll(4)
+ sz, = unpack('!i', buff)
+ self.__rbuf = StringIO(self.__trans.readAll(sz))
+ def write(self, buf):
+ self.__wbuf.write(buf)
+ def flush(self):
+ wout = self.__wbuf.getvalue()
+ wsz = len(wout)
+ # reset wbuf before write/flush to preserve state on underlying failure
+ self.__wbuf = StringIO()
+ # N.B.: Doing this string concatenation is WAY cheaper than making
+ # two separate calls to the underlying socket object. Socket writes in
+ # Python turn out to be REALLY expensive, but it seems to do a pretty
+ # good job of managing string buffer operations without excessive copies
+ buf = pack("!i", wsz) + wout
+ self.__trans.write(buf)
+ self.__trans.flush()
+ # Implement the CReadableTransport interface.
+ @property
+ def cstringio_buf(self):
+ return self.__rbuf
+ def cstringio_refill(self, prefix, reqlen):
+ # self.__rbuf will already be empty here because fastbinary doesn't
+ # ask for a refill until the previous buffer is empty. Therefore,
+ # we can start reading new frames immediately.
+ while len(prefix) < reqlen:
+ self.readFrame()
+ prefix += self.__rbuf.getvalue()
+ self.__rbuf = StringIO(prefix)
+ return self.__rbuf
+class TFileObjectTransport(TTransportBase):
+ """Wraps a file-like object to make it work as a Thrift transport."""
+ def __init__(self, fileobj):
+ self.fileobj = fileobj
+ def isOpen(self):
+ return True
+ def close(self):
+ self.fileobj.close()
+ def read(self, sz):
+ return
+ def write(self, buf):
+ self.fileobj.write(buf)
+ def flush(self):
+ self.fileobj.flush()