From 8049ddfe26c64e48e3bd727ba8c989d71906cfcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C3=85strand=20=28astrand=29?= Date: Tue, 17 Dec 2013 13:56:20 +0100 Subject: [PATCH] Cherry-picked 4e3388964af9697496d2c5e000bb8559dfba87ad from astrand/websockify: Prepare for fixing https://github.com/kanaka/websockify/issues/71: Move around functions and methods, so that connection-related and server-related stuff are close together. This patch just moves things around - it does not change anything at all. This can be verified with: git diff websocket.py | grep ^- | cut -c 2- | sort > removed git diff websocket.py | grep ^+ | cut -c 2- | sort > added diff -u removed added --- websockify/websocket.py | 688 ++++++++++++++++++++-------------------- 1 file changed, 344 insertions(+), 344 deletions(-) diff --git a/websockify/websocket.py b/websockify/websocket.py index add0337..d264273 100644 --- a/websockify/websocket.py +++ b/websockify/websocket.py @@ -74,20 +74,14 @@ class WebSocketServer(object): log_prefix = "websocket" buffer_size = 65536 + GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + server_handshake_hybi = """HTTP/1.1 101 Switching Protocols\r Upgrade: websocket\r Connection: Upgrade\r Sec-WebSocket-Accept: %s\r """ - GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - - policy_response = """\n""" - - # An exception before the WebSocket connection was established - class EClose(Exception): - pass - # An exception while the WebSocket client was connected class CClose(Exception): pass @@ -95,6 +89,318 @@ Sec-WebSocket-Accept: %s\r class Terminate(Exception): pass + + @staticmethod + def unmask(buf, hlen, plen): + pstart = hlen + 4 + pend = pstart + plen + if numpy: + b = c = s2b('') + if plen >= 4: + mask = numpy.frombuffer(buf, dtype=numpy.dtype('BB', b1, payload_len) + elif payload_len > 125 and payload_len < 65536: + header = pack('>BBH', b1, 126, payload_len) + elif payload_len >= 65536: + header = pack('>BBQ', b1, 127, payload_len) + + #self.msg("Encoded: %s", repr(header + buf)) + + return header + buf, len(header), 0 + + @staticmethod + def decode_hybi(buf, base64=False, logger=None): + """ Decode HyBi style WebSocket packets. + Returns: + {'fin' : 0_or_1, + 'opcode' : number, + 'masked' : boolean, + 'hlen' : header_bytes_number, + 'length' : payload_bytes_number, + 'payload' : decoded_buffer, + 'left' : bytes_left_number, + 'close_code' : number, + 'close_reason' : string} + """ + + f = {'fin' : 0, + 'opcode' : 0, + 'masked' : False, + 'hlen' : 2, + 'length' : 0, + 'payload' : None, + 'left' : 0, + 'close_code' : 1000, + 'close_reason' : ''} + + if logger is None: + logger = WebSocketServer.get_logger() + + blen = len(buf) + f['left'] = blen + + if blen < f['hlen']: + return f # Incomplete frame header + + b1, b2 = unpack_from(">BB", buf) + f['opcode'] = b1 & 0x0f + f['fin'] = (b1 & 0x80) >> 7 + f['masked'] = (b2 & 0x80) >> 7 + + f['length'] = b2 & 0x7f + + if f['length'] == 126: + f['hlen'] = 4 + if blen < f['hlen']: + return f # Incomplete frame header + (f['length'],) = unpack_from('>xxH', buf) + elif f['length'] == 127: + f['hlen'] = 10 + if blen < f['hlen']: + return f # Incomplete frame header + (f['length'],) = unpack_from('>xxQ', buf) + + full_len = f['hlen'] + f['masked'] * 4 + f['length'] + + if blen < full_len: # Incomplete frame + return f # Incomplete frame header + + # Number of bytes that are part of the next frame(s) + f['left'] = blen - full_len + + # Process 1 frame + if f['masked']: + # unmask payload + f['payload'] = WebSocketServer.unmask(buf, f['hlen'], + f['length']) + else: + logger.debug("Unmasked frame: %s" % repr(buf)) + f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len] + + if base64 and f['opcode'] in [1, 2]: + try: + f['payload'] = b64decode(f['payload']) + except: + logger.exception("Exception while b64decoding buffer: %s" % + (repr(buf))) + raise + + if f['opcode'] == 0x08: + if f['length'] >= 2: + f['close_code'] = unpack_from(">H", f['payload'])[0] + if f['length'] > 3: + f['close_reason'] = f['payload'][2:] + + return f + + + # + # Main WebSocketServer methods + # + + def send_frames(self, bufs=None): + """ Encode and send WebSocket frames. Any frames already + queued will be sent first. If buf is not set then only queued + frames will be sent. Returns the number of pending frames that + could not be fully sent. If returned pending frames is greater + than 0, then the caller should call again when the socket is + ready. """ + + tdelta = int(time.time()*1000) - self.start_time + + if bufs: + for buf in bufs: + if self.base64: + encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True) + else: + encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False) + + if self.rec: + self.rec.write("%s,\n" % + repr("{%s{" % tdelta + + encbuf[lenhead:len(encbuf)-lentail])) + + self.send_parts.append(encbuf) + + while self.send_parts: + # Send pending frames + buf = self.send_parts.pop(0) + sent = self.request.send(buf) + + if sent == len(buf): + self.print_traffic("<") + else: + self.print_traffic("<.") + self.send_parts.insert(0, buf[sent:]) + break + + return len(self.send_parts) + + def recv_frames(self): + """ Receive and decode WebSocket frames. + + Returns: + (bufs_list, closed_string) + """ + + closed = False + bufs = [] + tdelta = int(time.time()*1000) - self.start_time + + buf = self.request.recv(self.buffer_size) + if len(buf) == 0: + closed = {'code': 1000, 'reason': "Client closed abruptly"} + return bufs, closed + + if self.recv_part: + # Add partially received frames to current read buffer + buf = self.recv_part + buf + self.recv_part = None + + while buf: + frame = self.decode_hybi(buf, base64=self.base64, + logger=self.logger) + #self.msg("Received buf: %s, frame: %s", repr(buf), frame) + + if frame['payload'] == None: + # Incomplete/partial frame + self.print_traffic("}.") + if frame['left'] > 0: + self.recv_part = buf[-frame['left']:] + break + else: + if frame['opcode'] == 0x8: # connection close + closed = {'code': frame['close_code'], + 'reason': frame['close_reason']} + break + + self.print_traffic("}") + + if self.rec: + start = frame['hlen'] + end = frame['hlen'] + frame['length'] + if frame['masked']: + recbuf = WebSocketServer.unmask(buf, frame['hlen'], + frame['length']) + else: + recbuf = buf[frame['hlen']:frame['hlen'] + + frame['length']] + self.rec.write("%s,\n" % + repr("}%s}" % tdelta + recbuf)) + + + bufs.append(frame['payload']) + + if frame['left']: + buf = buf[-frame['left']:] + else: + buf = '' + + return bufs, closed + + def send_close(self, code=1000, reason=''): + """ Send a WebSocket orderly close frame. """ + + msg = pack(">H%ds" % len(reason), code, reason) + buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False) + self.request.send(buf) + + def do_websocket_handshake(self, headers, path): + h = self.headers = headers + self.path = path + + prot = 'WebSocket-Protocol' + protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',') + + ver = h.get('Sec-WebSocket-Version') + if ver: + # HyBi/IETF version of the protocol + + # HyBi-07 report version 7 + # HyBi-08 - HyBi-12 report version 8 + # HyBi-13 reports version 13 + if ver in ['7', '8', '13']: + self.version = "hybi-%02d" % int(ver) + else: + raise self.EClose('Unsupported protocol version %s' % ver) + + key = h['Sec-WebSocket-Key'] + + # Choose binary if client supports it + if 'binary' in protocols: + self.base64 = False + elif 'base64' in protocols: + self.base64 = True + else: + raise self.EClose("Client must support 'binary' or 'base64' protocol") + + # Generate the hash value for the accept header + accept = b64encode(sha1(s2b(key + self.GUID)).digest()) + + response = self.server_handshake_hybi % b2s(accept) + if self.base64: + response += "Sec-WebSocket-Protocol: base64\r\n" + else: + response += "Sec-WebSocket-Protocol: binary\r\n" + response += "\r\n" + + else: + raise self.EClose("Missing Sec-WebSocket-Version header. Hixie protocols not supported.") + + return response + + def new_websocket_client(self): + """ Do something with a WebSockets client connection. """ + raise("WebSocketServer.new_websocket_client() must be overloaded") + + policy_response = """\n""" + + # An exception before the WebSocket connection was established + class EClose(Exception): + pass + def __init__(self, listen_host='', listen_port=None, source_is_ipv6=False, verbose=False, cert='', key='', ssl_only=None, daemon=False, record='', web='', @@ -270,338 +576,6 @@ Sec-WebSocket-Accept: %s\r os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno()) - @staticmethod - def unmask(buf, hlen, plen): - pstart = hlen + 4 - pend = pstart + plen - if numpy: - b = c = s2b('') - if plen >= 4: - mask = numpy.frombuffer(buf, dtype=numpy.dtype('BB', b1, payload_len) - elif payload_len > 125 and payload_len < 65536: - header = pack('>BBH', b1, 126, payload_len) - elif payload_len >= 65536: - header = pack('>BBQ', b1, 127, payload_len) - - #self.msg("Encoded: %s", repr(header + buf)) - - return header + buf, len(header), 0 - - @staticmethod - def decode_hybi(buf, base64=False, logger=None): - """ Decode HyBi style WebSocket packets. - Returns: - {'fin' : 0_or_1, - 'opcode' : number, - 'masked' : boolean, - 'hlen' : header_bytes_number, - 'length' : payload_bytes_number, - 'payload' : decoded_buffer, - 'left' : bytes_left_number, - 'close_code' : number, - 'close_reason' : string} - """ - - f = {'fin' : 0, - 'opcode' : 0, - 'masked' : False, - 'hlen' : 2, - 'length' : 0, - 'payload' : None, - 'left' : 0, - 'close_code' : 1000, - 'close_reason' : ''} - - if logger is None: - logger = WebSocketServer.get_logger() - - blen = len(buf) - f['left'] = blen - - if blen < f['hlen']: - return f # Incomplete frame header - - b1, b2 = unpack_from(">BB", buf) - f['opcode'] = b1 & 0x0f - f['fin'] = (b1 & 0x80) >> 7 - f['masked'] = (b2 & 0x80) >> 7 - - f['length'] = b2 & 0x7f - - if f['length'] == 126: - f['hlen'] = 4 - if blen < f['hlen']: - return f # Incomplete frame header - (f['length'],) = unpack_from('>xxH', buf) - elif f['length'] == 127: - f['hlen'] = 10 - if blen < f['hlen']: - return f # Incomplete frame header - (f['length'],) = unpack_from('>xxQ', buf) - - full_len = f['hlen'] + f['masked'] * 4 + f['length'] - - if blen < full_len: # Incomplete frame - return f # Incomplete frame header - - # Number of bytes that are part of the next frame(s) - f['left'] = blen - full_len - - # Process 1 frame - if f['masked']: - # unmask payload - f['payload'] = WebSocketServer.unmask(buf, f['hlen'], - f['length']) - else: - logger.debug("Unmasked frame: %s" % repr(buf)) - f['payload'] = buf[(f['hlen'] + f['masked'] * 4):full_len] - - if base64 and f['opcode'] in [1, 2]: - try: - f['payload'] = b64decode(f['payload']) - except: - logger.exception("Exception while b64decoding buffer: %s" % - (repr(buf))) - raise - - if f['opcode'] == 0x08: - if f['length'] >= 2: - f['close_code'] = unpack_from(">H", f['payload'])[0] - if f['length'] > 3: - f['close_reason'] = f['payload'][2:] - - return f - - - # - # WebSocketServer logging/output functions - # - - def print_traffic(self, token="."): - """ Show traffic flow mode. """ - if self.traffic: - sys.stdout.write(token) - sys.stdout.flush() - - - def log(self, lvl, msg, *args, **kwargs): - """ Wrapper around python logging """ - prefix = "" - if self.i_am_client: - prefix = "% 3d: " % self.handler_id - self.logger.log(lvl, "%s%s" % (prefix, msg), - *args, **kwargs) - - def msg(self, *args, **kwargs): - """ Output message with handler_id prefix. """ - self.log(logging.INFO, *args, **kwargs) - - def vmsg(self, *args, **kwargs): - """ Same as msg() but as debug. """ - self.log(logging.DEBUG, *args, **kwargs) - - def warn(self, *args, **kwargs): - """ Same as msg() but as warning. """ - self.log(logging.WARN, *args, **kwargs) - - # - # Main WebSocketServer methods - # - def send_frames(self, bufs=None): - """ Encode and send WebSocket frames. Any frames already - queued will be sent first. If buf is not set then only queued - frames will be sent. Returns the number of pending frames that - could not be fully sent. If returned pending frames is greater - than 0, then the caller should call again when the socket is - ready. """ - - tdelta = int(time.time()*1000) - self.start_time - - if bufs: - for buf in bufs: - if self.base64: - encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=1, base64=True) - else: - encbuf, lenhead, lentail = self.encode_hybi(buf, opcode=2, base64=False) - - if self.rec: - self.rec.write("%s,\n" % - repr("{%s{" % tdelta - + encbuf[lenhead:len(encbuf)-lentail])) - - self.send_parts.append(encbuf) - - while self.send_parts: - # Send pending frames - buf = self.send_parts.pop(0) - sent = self.request.send(buf) - - if sent == len(buf): - self.print_traffic("<") - else: - self.print_traffic("<.") - self.send_parts.insert(0, buf[sent:]) - break - - return len(self.send_parts) - - def recv_frames(self): - """ Receive and decode WebSocket frames. - - Returns: - (bufs_list, closed_string) - """ - - closed = False - bufs = [] - tdelta = int(time.time()*1000) - self.start_time - - buf = self.request.recv(self.buffer_size) - if len(buf) == 0: - closed = {'code': 1000, 'reason': "Client closed abruptly"} - return bufs, closed - - if self.recv_part: - # Add partially received frames to current read buffer - buf = self.recv_part + buf - self.recv_part = None - - while buf: - frame = self.decode_hybi(buf, base64=self.base64, - logger=self.logger) - #self.msg("Received buf: %s, frame: %s", repr(buf), frame) - - if frame['payload'] == None: - # Incomplete/partial frame - self.print_traffic("}.") - if frame['left'] > 0: - self.recv_part = buf[-frame['left']:] - break - else: - if frame['opcode'] == 0x8: # connection close - closed = {'code': frame['close_code'], - 'reason': frame['close_reason']} - break - - self.print_traffic("}") - - if self.rec: - start = frame['hlen'] - end = frame['hlen'] + frame['length'] - if frame['masked']: - recbuf = WebSocketServer.unmask(buf, frame['hlen'], - frame['length']) - else: - recbuf = buf[frame['hlen']:frame['hlen'] + - frame['length']] - self.rec.write("%s,\n" % - repr("}%s}" % tdelta + recbuf)) - - - bufs.append(frame['payload']) - - if frame['left']: - buf = buf[-frame['left']:] - else: - buf = '' - - return bufs, closed - - def send_close(self, code=1000, reason=''): - """ Send a WebSocket orderly close frame. """ - - msg = pack(">H%ds" % len(reason), code, reason) - buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False) - self.request.send(buf) - - def do_websocket_handshake(self, headers, path): - h = self.headers = headers - self.path = path - - prot = 'WebSocket-Protocol' - protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',') - - ver = h.get('Sec-WebSocket-Version') - if ver: - # HyBi/IETF version of the protocol - - # HyBi-07 report version 7 - # HyBi-08 - HyBi-12 report version 8 - # HyBi-13 reports version 13 - if ver in ['7', '8', '13']: - self.version = "hybi-%02d" % int(ver) - else: - raise self.EClose('Unsupported protocol version %s' % ver) - - key = h['Sec-WebSocket-Key'] - - # Choose binary if client supports it - if 'binary' in protocols: - self.base64 = False - elif 'base64' in protocols: - self.base64 = True - else: - raise self.EClose("Client must support 'binary' or 'base64' protocol") - - # Generate the hash value for the accept header - accept = b64encode(sha1(s2b(key + self.GUID)).digest()) - - response = self.server_handshake_hybi % b2s(accept) - if self.base64: - response += "Sec-WebSocket-Protocol: base64\r\n" - else: - response += "Sec-WebSocket-Protocol: binary\r\n" - response += "\r\n" - - else: - raise self.EClose("Missing Sec-WebSocket-Version header. Hixie protocols not supported.") - - return response - - def do_handshake(self, sock, address): """ do_handshake does the following: @@ -704,6 +678,36 @@ Sec-WebSocket-Accept: %s\r # Return the WebSockets socket which may be SSL wrapped return retsock + # + # WebSocketServer logging/output functions + # + def print_traffic(self, token="."): + """ Show traffic flow mode. """ + if self.traffic: + sys.stdout.write(token) + sys.stdout.flush() + + + def log(self, lvl, msg, *args, **kwargs): + """ Wrapper around python logging """ + prefix = "" + if self.i_am_client: + prefix = "% 3d: " % self.handler_id + self.logger.log(lvl, "%s%s" % (prefix, msg), + *args, **kwargs) + + def msg(self, *args, **kwargs): + """ Output message with handler_id prefix. """ + self.log(logging.INFO, *args, **kwargs) + + def vmsg(self, *args, **kwargs): + """ Same as msg() but as debug. """ + self.log(logging.DEBUG, *args, **kwargs) + + def warn(self, *args, **kwargs): + """ Same as msg() but as warning. """ + self.log(logging.WARN, *args, **kwargs) + # # Events that can/should be overridden in sub-classes @@ -797,10 +801,6 @@ Sec-WebSocket-Accept: %s\r # Original socket closed by caller self.request.close() - def new_websocket_client(self): - """ Do something with a WebSockets client connection. """ - raise("WebSocketServer.new_websocket_client() must be overloaded") - def start_server(self): """ Daemonize if requested. Listen for for connections. Run