summaryrefslogtreecommitdiffstats
path: root/module/lib/mod_pywebsocket/extensions.py
diff options
context:
space:
mode:
authorGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2013-03-11 21:09:59 +0100
committerGravatar RaNaN <Mast3rRaNaN@hotmail.de> 2013-03-11 21:09:59 +0100
commit5e27b8b8ff3e7f4c3f2b8ebe525b716f1c211d12 (patch)
treeacdca4b00fc1a29dc8fec823f5b45cb80cf719b0 /module/lib/mod_pywebsocket/extensions.py
parentbetter time formatting (diff)
downloadpyload-5e27b8b8ff3e7f4c3f2b8ebe525b716f1c211d12.tar.xz
updated pywebsocket
Diffstat (limited to 'module/lib/mod_pywebsocket/extensions.py')
-rw-r--r--module/lib/mod_pywebsocket/extensions.py437
1 files changed, 404 insertions, 33 deletions
diff --git a/module/lib/mod_pywebsocket/extensions.py b/module/lib/mod_pywebsocket/extensions.py
index 52b7a4a19..03dbf9ee1 100644
--- a/module/lib/mod_pywebsocket/extensions.py
+++ b/module/lib/mod_pywebsocket/extensions.py
@@ -38,6 +38,9 @@ _available_processors = {}
class ExtensionProcessorInterface(object):
+ def name(self):
+ return None
+
def get_extension_response(self):
return None
@@ -46,13 +49,21 @@ class ExtensionProcessorInterface(object):
class DeflateStreamExtensionProcessor(ExtensionProcessorInterface):
- """WebSocket DEFLATE stream extension processor."""
+ """WebSocket DEFLATE stream extension processor.
+
+ Specification:
+ Section 9.2.1 in
+ http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10
+ """
def __init__(self, request):
self._logger = util.get_class_logger(self)
self._request = request
+ def name(self):
+ return common.DEFLATE_STREAM_EXTENSION
+
def get_extension_response(self):
if len(self._request.get_parameter_names()) != 0:
return None
@@ -70,8 +81,40 @@ _available_processors[common.DEFLATE_STREAM_EXTENSION] = (
DeflateStreamExtensionProcessor)
+def _log_compression_ratio(logger, original_bytes, total_original_bytes,
+ filtered_bytes, total_filtered_bytes):
+ # Print inf when ratio is not available.
+ ratio = float('inf')
+ average_ratio = float('inf')
+ if original_bytes != 0:
+ ratio = float(filtered_bytes) / original_bytes
+ if total_original_bytes != 0:
+ average_ratio = (
+ float(total_filtered_bytes) / total_original_bytes)
+ logger.debug('Outgoing compress ratio: %f (average: %f)' %
+ (ratio, average_ratio))
+
+
+def _log_decompression_ratio(logger, received_bytes, total_received_bytes,
+ filtered_bytes, total_filtered_bytes):
+ # Print inf when ratio is not available.
+ ratio = float('inf')
+ average_ratio = float('inf')
+ if received_bytes != 0:
+ ratio = float(received_bytes) / filtered_bytes
+ if total_filtered_bytes != 0:
+ average_ratio = (
+ float(total_received_bytes) / total_filtered_bytes)
+ logger.debug('Incoming compress ratio: %f (average: %f)' %
+ (ratio, average_ratio))
+
+
class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
- """WebSocket Per-frame DEFLATE extension processor."""
+ """WebSocket Per-frame DEFLATE extension processor.
+
+ Specification:
+ http://tools.ietf.org/html/draft-tyoshino-hybi-websocket-perframe-deflate
+ """
_WINDOW_BITS_PARAM = 'max_window_bits'
_NO_CONTEXT_TAKEOVER_PARAM = 'no_context_takeover'
@@ -83,6 +126,7 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
self._response_window_bits = None
self._response_no_context_takeover = False
+ self._bfinal = False
# Counters for statistics.
@@ -96,6 +140,9 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
# Total number of incoming bytes obtained after applying this filter.
self._total_filtered_incoming_payload_bytes = 0
+ def name(self):
+ return common.DEFLATE_FRAME_EXTENSION
+
def get_extension_response(self):
# Any unknown parameter will be just ignored.
@@ -173,6 +220,9 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
def set_response_no_context_takeover(self, value):
self._response_no_context_takeover = value
+ def set_bfinal(self, value):
+ self._bfinal = value
+
def enable_outgoing_compression(self):
self._compress_outgoing = True
@@ -193,24 +243,17 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
original_payload_size)
return
- frame.payload = self._deflater.filter(frame.payload)
+ frame.payload = self._deflater.filter(
+ frame.payload, bfinal=self._bfinal)
frame.rsv1 = 1
filtered_payload_size = len(frame.payload)
self._total_filtered_outgoing_payload_bytes += filtered_payload_size
- # Print inf when ratio is not available.
- ratio = float('inf')
- average_ratio = float('inf')
- if original_payload_size != 0:
- ratio = float(filtered_payload_size) / original_payload_size
- if self._total_outgoing_payload_bytes != 0:
- average_ratio = (
- float(self._total_filtered_outgoing_payload_bytes) /
- self._total_outgoing_payload_bytes)
- self._logger.debug(
- 'Outgoing compress ratio: %f (average: %f)' %
- (ratio, average_ratio))
+ _log_compression_ratio(self._logger, original_payload_size,
+ self._total_outgoing_payload_bytes,
+ filtered_payload_size,
+ self._total_filtered_outgoing_payload_bytes)
def _incoming_filter(self, frame):
"""Transform incoming frames. This method is called only by
@@ -231,18 +274,10 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
filtered_payload_size = len(frame.payload)
self._total_filtered_incoming_payload_bytes += filtered_payload_size
- # Print inf when ratio is not available.
- ratio = float('inf')
- average_ratio = float('inf')
- if received_payload_size != 0:
- ratio = float(received_payload_size) / filtered_payload_size
- if self._total_filtered_incoming_payload_bytes != 0:
- average_ratio = (
- float(self._total_incoming_payload_bytes) /
- self._total_filtered_incoming_payload_bytes)
- self._logger.debug(
- 'Incoming compress ratio: %f (average: %f)' %
- (ratio, average_ratio))
+ _log_decompression_ratio(self._logger, received_payload_size,
+ self._total_incoming_payload_bytes,
+ filtered_payload_size,
+ self._total_filtered_incoming_payload_bytes)
_available_processors[common.DEFLATE_FRAME_EXTENSION] = (
@@ -250,7 +285,7 @@ _available_processors[common.DEFLATE_FRAME_EXTENSION] = (
# Adding vendor-prefixed deflate-frame extension.
-# TODO(bashi): Remove this after WebKit stops using vender prefix.
+# TODO(bashi): Remove this after WebKit stops using vendor prefix.
_available_processors[common.X_WEBKIT_DEFLATE_FRAME_EXTENSION] = (
DeflateFrameExtensionProcessor)
@@ -270,21 +305,22 @@ def _create_accepted_method_desc(method_name, method_params):
return common.format_extension(extension)
-class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface):
- """WebSocket Per-frame compression extension processor."""
+class CompressionExtensionProcessorBase(ExtensionProcessorInterface):
+ """Base class for Per-frame and Per-message compression extension."""
_METHOD_PARAM = 'method'
- _DEFLATE_METHOD = 'deflate'
def __init__(self, request):
self._logger = util.get_class_logger(self)
self._request = request
self._compression_method_name = None
self._compression_processor = None
+ self._compression_processor_hook = None
+
+ def name(self):
+ return ''
def _lookup_compression_processor(self, method_desc):
- if method_desc.name() == self._DEFLATE_METHOD:
- return DeflateFrameExtensionProcessor(method_desc)
return None
def _get_compression_processor_response(self):
@@ -311,6 +347,10 @@ class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface):
break
if compression_processor is None:
return None
+
+ if self._compression_processor_hook:
+ self._compression_processor_hook(compression_processor)
+
processor_response = compression_processor.get_extension_response()
if processor_response is None:
return None
@@ -337,14 +377,345 @@ class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface):
return
self._compression_processor.setup_stream_options(stream_options)
+ def set_compression_processor_hook(self, hook):
+ self._compression_processor_hook = hook
+
def get_compression_processor(self):
return self._compression_processor
+class PerFrameCompressionExtensionProcessor(CompressionExtensionProcessorBase):
+ """WebSocket Per-frame compression extension processor.
+
+ Specification:
+ http://tools.ietf.org/html/draft-ietf-hybi-websocket-perframe-compression
+ """
+
+ _DEFLATE_METHOD = 'deflate'
+
+ def __init__(self, request):
+ CompressionExtensionProcessorBase.__init__(self, request)
+
+ def name(self):
+ return common.PERFRAME_COMPRESSION_EXTENSION
+
+ def _lookup_compression_processor(self, method_desc):
+ if method_desc.name() == self._DEFLATE_METHOD:
+ return DeflateFrameExtensionProcessor(method_desc)
+ return None
+
+
_available_processors[common.PERFRAME_COMPRESSION_EXTENSION] = (
PerFrameCompressionExtensionProcessor)
+class DeflateMessageProcessor(ExtensionProcessorInterface):
+ """Per-message deflate processor."""
+
+ _S2C_MAX_WINDOW_BITS_PARAM = 's2c_max_window_bits'
+ _S2C_NO_CONTEXT_TAKEOVER_PARAM = 's2c_no_context_takeover'
+ _C2S_MAX_WINDOW_BITS_PARAM = 'c2s_max_window_bits'
+ _C2S_NO_CONTEXT_TAKEOVER_PARAM = 'c2s_no_context_takeover'
+
+ def __init__(self, request):
+ self._request = request
+ self._logger = util.get_class_logger(self)
+
+ self._c2s_max_window_bits = None
+ self._c2s_no_context_takeover = False
+ self._bfinal = False
+
+ self._compress_outgoing_enabled = False
+
+ # True if a message is fragmented and compression is ongoing.
+ self._compress_ongoing = False
+
+ # Counters for statistics.
+
+ # Total number of outgoing bytes supplied to this filter.
+ self._total_outgoing_payload_bytes = 0
+ # Total number of bytes sent to the network after applying this filter.
+ self._total_filtered_outgoing_payload_bytes = 0
+
+ # Total number of bytes received from the network.
+ self._total_incoming_payload_bytes = 0
+ # Total number of incoming bytes obtained after applying this filter.
+ self._total_filtered_incoming_payload_bytes = 0
+
+ def name(self):
+ return 'deflate'
+
+ def get_extension_response(self):
+ # Any unknown parameter will be just ignored.
+
+ s2c_max_window_bits = self._request.get_parameter_value(
+ self._S2C_MAX_WINDOW_BITS_PARAM)
+ if s2c_max_window_bits is not None:
+ try:
+ s2c_max_window_bits = int(s2c_max_window_bits)
+ except ValueError, e:
+ return None
+ if s2c_max_window_bits < 8 or s2c_max_window_bits > 15:
+ return None
+
+ s2c_no_context_takeover = self._request.has_parameter(
+ self._S2C_NO_CONTEXT_TAKEOVER_PARAM)
+ if (s2c_no_context_takeover and
+ self._request.get_parameter_value(
+ self._S2C_NO_CONTEXT_TAKEOVER_PARAM) is not None):
+ return None
+
+ self._deflater = util._RFC1979Deflater(
+ s2c_max_window_bits, s2c_no_context_takeover)
+
+ self._inflater = util._RFC1979Inflater()
+
+ self._compress_outgoing_enabled = True
+
+ response = common.ExtensionParameter(self._request.name())
+
+ if s2c_max_window_bits is not None:
+ response.add_parameter(
+ self._S2C_MAX_WINDOW_BITS_PARAM, str(s2c_max_window_bits))
+
+ if s2c_no_context_takeover:
+ response.add_parameter(
+ self._S2C_NO_CONTEXT_TAKEOVER_PARAM, None)
+
+ if self._c2s_max_window_bits is not None:
+ response.add_parameter(
+ self._C2S_MAX_WINDOW_BITS_PARAM,
+ str(self._c2s_max_window_bits))
+ if self._c2s_no_context_takeover:
+ response.add_parameter(
+ self._C2S_NO_CONTEXT_TAKEOVER_PARAM, None)
+
+ self._logger.debug(
+ 'Enable %s extension ('
+ 'request: s2c_max_window_bits=%s; s2c_no_context_takeover=%r, '
+ 'response: c2s_max_window_bits=%s; c2s_no_context_takeover=%r)' %
+ (self._request.name(),
+ s2c_max_window_bits,
+ s2c_no_context_takeover,
+ self._c2s_max_window_bits,
+ self._c2s_no_context_takeover))
+
+ return response
+
+ def setup_stream_options(self, stream_options):
+ class _OutgoingMessageFilter(object):
+
+ def __init__(self, parent):
+ self._parent = parent
+
+ def filter(self, message, end=True, binary=False):
+ return self._parent._process_outgoing_message(
+ message, end, binary)
+
+ class _IncomingMessageFilter(object):
+
+ def __init__(self, parent):
+ self._parent = parent
+ self._decompress_next_message = False
+
+ def decompress_next_message(self):
+ self._decompress_next_message = True
+
+ def filter(self, message):
+ message = self._parent._process_incoming_message(
+ message, self._decompress_next_message)
+ self._decompress_next_message = False
+ return message
+
+ self._outgoing_message_filter = _OutgoingMessageFilter(self)
+ self._incoming_message_filter = _IncomingMessageFilter(self)
+ stream_options.outgoing_message_filters.append(
+ self._outgoing_message_filter)
+ stream_options.incoming_message_filters.append(
+ self._incoming_message_filter)
+
+ class _OutgoingFrameFilter(object):
+
+ def __init__(self, parent):
+ self._parent = parent
+ self._set_compression_bit = False
+
+ def set_compression_bit(self):
+ self._set_compression_bit = True
+
+ def filter(self, frame):
+ self._parent._process_outgoing_frame(
+ frame, self._set_compression_bit)
+ self._set_compression_bit = False
+
+ class _IncomingFrameFilter(object):
+
+ def __init__(self, parent):
+ self._parent = parent
+
+ def filter(self, frame):
+ self._parent._process_incoming_frame(frame)
+
+ self._outgoing_frame_filter = _OutgoingFrameFilter(self)
+ self._incoming_frame_filter = _IncomingFrameFilter(self)
+ stream_options.outgoing_frame_filters.append(
+ self._outgoing_frame_filter)
+ stream_options.incoming_frame_filters.append(
+ self._incoming_frame_filter)
+
+ stream_options.encode_text_message_to_utf8 = False
+
+ def set_c2s_max_window_bits(self, value):
+ self._c2s_max_window_bits = value
+
+ def set_c2s_no_context_takeover(self, value):
+ self._c2s_no_context_takeover = value
+
+ def set_bfinal(self, value):
+ self._bfinal = value
+
+ def enable_outgoing_compression(self):
+ self._compress_outgoing_enabled = True
+
+ def disable_outgoing_compression(self):
+ self._compress_outgoing_enabled = False
+
+ def _process_incoming_message(self, message, decompress):
+ if not decompress:
+ return message
+
+ received_payload_size = len(message)
+ self._total_incoming_payload_bytes += received_payload_size
+
+ message = self._inflater.filter(message)
+
+ filtered_payload_size = len(message)
+ self._total_filtered_incoming_payload_bytes += filtered_payload_size
+
+ _log_decompression_ratio(self._logger, received_payload_size,
+ self._total_incoming_payload_bytes,
+ filtered_payload_size,
+ self._total_filtered_incoming_payload_bytes)
+
+ return message
+
+ def _process_outgoing_message(self, message, end, binary):
+ if not binary:
+ message = message.encode('utf-8')
+
+ if not self._compress_outgoing_enabled:
+ return message
+
+ original_payload_size = len(message)
+ self._total_outgoing_payload_bytes += original_payload_size
+
+ message = self._deflater.filter(
+ message, flush=end, bfinal=self._bfinal)
+
+ filtered_payload_size = len(message)
+ self._total_filtered_outgoing_payload_bytes += filtered_payload_size
+
+ _log_compression_ratio(self._logger, original_payload_size,
+ self._total_outgoing_payload_bytes,
+ filtered_payload_size,
+ self._total_filtered_outgoing_payload_bytes)
+
+ if not self._compress_ongoing:
+ self._outgoing_frame_filter.set_compression_bit()
+ self._compress_ongoing = not end
+ return message
+
+ def _process_incoming_frame(self, frame):
+ if frame.rsv1 == 1 and not common.is_control_opcode(frame.opcode):
+ self._incoming_message_filter.decompress_next_message()
+ frame.rsv1 = 0
+
+ def _process_outgoing_frame(self, frame, compression_bit):
+ if (not compression_bit or
+ common.is_control_opcode(frame.opcode)):
+ return
+
+ frame.rsv1 = 1
+
+
+class PerMessageCompressionExtensionProcessor(
+ CompressionExtensionProcessorBase):
+ """WebSocket Per-message compression extension processor.
+
+ Specification:
+ http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression
+ """
+
+ _DEFLATE_METHOD = 'deflate'
+
+ def __init__(self, request):
+ CompressionExtensionProcessorBase.__init__(self, request)
+
+ def name(self):
+ return common.PERMESSAGE_COMPRESSION_EXTENSION
+
+ def _lookup_compression_processor(self, method_desc):
+ if method_desc.name() == self._DEFLATE_METHOD:
+ return DeflateMessageProcessor(method_desc)
+ return None
+
+
+_available_processors[common.PERMESSAGE_COMPRESSION_EXTENSION] = (
+ PerMessageCompressionExtensionProcessor)
+
+
+# Adding vendor-prefixed permessage-compress extension.
+# TODO(bashi): Remove this after WebKit stops using vendor prefix.
+_available_processors[common.X_WEBKIT_PERMESSAGE_COMPRESSION_EXTENSION] = (
+ PerMessageCompressionExtensionProcessor)
+
+
+class MuxExtensionProcessor(ExtensionProcessorInterface):
+ """WebSocket multiplexing extension processor."""
+
+ _QUOTA_PARAM = 'quota'
+
+ def __init__(self, request):
+ self._request = request
+
+ def name(self):
+ return common.MUX_EXTENSION
+
+ def get_extension_response(self, ws_request,
+ logical_channel_extensions):
+ # Mux extension cannot be used after extensions that depend on
+ # frame boundary, extension data field, or any reserved bits
+ # which are attributed to each frame.
+ for extension in logical_channel_extensions:
+ name = extension.name()
+ if (name == common.PERFRAME_COMPRESSION_EXTENSION or
+ name == common.DEFLATE_FRAME_EXTENSION or
+ name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION):
+ return None
+
+ quota = self._request.get_parameter_value(self._QUOTA_PARAM)
+ if quota is None:
+ ws_request.mux_quota = 0
+ else:
+ try:
+ quota = int(quota)
+ except ValueError, e:
+ return None
+ if quota < 0 or quota >= 2 ** 32:
+ return None
+ ws_request.mux_quota = quota
+
+ ws_request.mux = True
+ ws_request.mux_extensions = logical_channel_extensions
+ return common.ExtensionParameter(common.MUX_EXTENSION)
+
+ def setup_stream_options(self, stream_options):
+ pass
+
+
+_available_processors[common.MUX_EXTENSION] = MuxExtensionProcessor
+
+
def get_extension_processor(extension_request):
global _available_processors
processor_class = _available_processors.get(extension_request.name())