Fix flake8 errors related to whitespace

This commit is contained in:
Takashi Kajinami 2025-06-08 22:38:15 +09:00
parent 9e1c731089
commit 9140ab7a7d
11 changed files with 96 additions and 96 deletions

View File

@ -11,7 +11,7 @@ as taken from http://docs.python.org/dev/library/ssl.html#certificates
'''
import os, sys, select, optparse, logging
sys.path.insert(0,os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from websockify.websockifyserver import WebSockifyServer, WebSockifyRequestHandler

View File

@ -5,7 +5,7 @@ import sys
import optparse
import select
sys.path.insert(0,os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from websockify.websocket import WebSocket, \
WebSocketWantReadError, WebSocketWantWriteError

View File

@ -7,7 +7,7 @@ given a sequence number. Any errors are reported and counted.
'''
import sys, os, select, random, time, optparse, logging
sys.path.insert(0,os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from websockify.websockifyserver import WebSockifyServer, WebSockifyRequestHandler
@ -74,13 +74,13 @@ class WebSocketLoad(WebSockifyRequestHandler):
def generate(self):
length = random.randint(10, self.max_packet_size)
numlist = self.rand_array[self.max_packet_size-length:]
numlist = self.rand_array[self.max_packet_size - length:]
# Error in length
#numlist.append(5)
chksum = sum(numlist)
# Error in checksum
#numlist[0] = 5
nums = "".join( [str(n) for n in numlist] )
nums = "".join([str(n) for n in numlist])
data = "^%d:%d:%d:%s$" % (self.send_cnt, length, chksum, nums)
self.send_cnt += 1
@ -102,7 +102,7 @@ class WebSocketLoad(WebSockifyRequestHandler):
try:
cnt, length, chksum, nums = data[1:-1].split(':')
cnt = int(cnt)
cnt = int(cnt)
length = int(length)
chksum = int(chksum)
except ValueError:
@ -151,7 +151,7 @@ if __name__ == '__main__':
if len(args) != 1: raise ValueError
opts.listen_port = int(args[0])
if len(args) not in [1,2]: raise ValueError
if len(args) not in [1, 2]: raise ValueError
opts.listen_port = int(args[0])
if len(args) == 2:
opts.delay = int(args[1])

View File

@ -114,7 +114,7 @@ class JWSTokenTestCase(unittest.TestCase):
key = jwk.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200})
jwt_token.make_signed_token(key)
mock_time.return_value = 150
@ -131,7 +131,7 @@ class JWSTokenTestCase(unittest.TestCase):
key = jwk.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200})
jwt_token.make_signed_token(key)
mock_time.return_value = 50
@ -146,7 +146,7 @@ class JWSTokenTestCase(unittest.TestCase):
key = jwk.JWK()
private_key = open("./tests/fixtures/private.pem", "rb").read()
key.import_from_pem(private_key)
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200 })
jwt_token = jwt.JWT({"alg": "RS256"}, {'host': "remote_host", 'port': "remote_port", 'nbf': 100, 'exp': 200})
jwt_token.make_signed_token(key)
mock_time.return_value = 250
@ -159,7 +159,7 @@ class JWSTokenTestCase(unittest.TestCase):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwk.JWK()
key.import_key(kty="oct",k=secret)
key.import_key(kty="oct", k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)
@ -174,7 +174,7 @@ class JWSTokenTestCase(unittest.TestCase):
secret = open("./tests/fixtures/symmetric.key").read()
key = jwk.JWK()
key.import_key(kty="oct",k=secret)
key.import_key(kty="oct", k=secret)
jwt_token = jwt.JWT({"alg": "HS256"}, {'host': "remote_host", 'port': "remote_port"})
jwt_token.make_signed_token(key)

View File

@ -313,7 +313,7 @@ class WebSockifyServerTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
pass
server = self._get_server(handler_class=FakeHandler, daemon=True,
server = self._get_server(handler_class=FakeHandler, daemon=True,
idle_timeout=1, ssl_ciphers=test_ciphers)
sock = FakeSocket(b"\x16some ssl data")
@ -354,7 +354,7 @@ class WebSockifyServerTestCase(unittest.TestCase):
def __init__(self, *args, **kwargs):
pass
server = self._get_server(handler_class=FakeHandler, daemon=True,
server = self._get_server(handler_class=FakeHandler, daemon=True,
idle_timeout=1, ssl_options=test_options)
sock = FakeSocket(b"\x16some ssl data")

View File

@ -13,7 +13,7 @@ class WebsockifySysLogHandler(handlers.SysLogHandler):
_legacy = False
_timestamp_fmt = '%Y-%m-%dT%H:%M:%SZ'
_max_hostname = 255
_max_ident = 24 #safer for old daemons
_max_ident = 24 # safer for old daemons
_send_length = False
_tail = '\n'
ident = None
@ -54,7 +54,7 @@ class WebsockifySysLogHandler(handlers.SysLogHandler):
try:
# Gather info.
text = self.format(record).replace(self._tail, ' ')
if not text: # nothing to log
if not text: # nothing to log
return
pri = self.encodePriority(self.facility,
@ -69,7 +69,7 @@ class WebsockifySysLogHandler(handlers.SysLogHandler):
else:
ident = ''
pid = os.getpid() # shouldn't need truncation
pid = os.getpid() # shouldn't need truncation
# Format the header.
head = {

View File

@ -114,8 +114,8 @@ class BaseTokenAPI(BasePlugin):
def process_result(self, resp):
host, port = resp.text.split(':')
port = port.encode('ascii','ignore')
return [ host, port ]
port = port.encode('ascii', 'ignore')
return [host, port]
def lookup(self, token):
import requests
@ -159,7 +159,7 @@ class JWTTokenApi(BasePlugin):
key.import_from_pem(key_data)
except:
try:
key.import_key(k=key_data.decode('utf-8'),kty='oct')
key.import_key(k=key_data.decode('utf-8'), kty='oct')
except:
logger.error('Failed to correctly parse key data!')
return None
@ -373,7 +373,7 @@ class UnixDomainSocketDirectory(BasePlugin):
if not stat.S_ISSOCK(uds_path.stat().st_mode):
return None
return [ 'unix_socket', uds_path ]
return ['unix_socket', uds_path]
except Exception as e:
logger.error("Error finding unix domain socket: %s" % str(e))
return None

View File

@ -761,7 +761,7 @@ class WebSocket:
pend = plen
b = c = b''
if plen >= 4:
dtype=numpy.dtype('<u4')
dtype = numpy.dtype('<u4')
if sys.byteorder == 'big':
dtype = dtype.newbyteorder('>')
mask = numpy.frombuffer(mask, dtype, count=1)
@ -770,7 +770,7 @@ class WebSocket:
b = numpy.bitwise_xor(data, mask).tobytes()
if plen % 4:
dtype=numpy.dtype('B')
dtype = numpy.dtype('B')
if sys.byteorder == 'big':
dtype = dtype.newbyteorder('>')
mask = numpy.frombuffer(mask, dtype, count=(plen % 4))
@ -829,11 +829,11 @@ class WebSocket:
'payload' : decoded_buffer}
"""
f = {'fin' : 0,
'opcode' : 0,
'masked' : False,
'length' : 0,
'payload' : None}
f = {'fin': 0,
'opcode': 0,
'masked': False,
'length': 0,
'payload': None}
blen = len(buf)
hlen = 2
@ -871,9 +871,9 @@ class WebSocket:
if f['masked']:
# unmask payload
mask_key = buf[hlen-4:hlen]
f['payload'] = self._unmask(buf[hlen:(hlen+length)], mask_key)
mask_key = buf[hlen - 4:hlen]
f['payload'] = self._unmask(buf[hlen:(hlen + length)], mask_key)
else:
f['payload'] = buf[hlen:(hlen+length)]
f['payload'] = buf[hlen:(hlen + length)]
return f

View File

@ -63,7 +63,7 @@ Traffic Legend:
# clear out any existing SSL_ headers that the client might
# have maliciously set
ssl_headers = [ h for h in self.headers if h.startswith('SSL_') ]
ssl_headers = [h for h in self.headers if h.startswith('SSL_')]
for h in ssl_headers:
del self.headers[h]
@ -160,7 +160,7 @@ Traffic Legend:
else:
# Extract the token parameter from url
args = parse_qs(urlparse(self.path)[4]) # 4 is the query from url
args = parse_qs(urlparse(self.path)[4]) # 4 is the query from url
if 'token' in args and len(args['token']):
token = args['token'][0].rstrip('\n')
@ -269,7 +269,7 @@ Traffic Legend:
# Send queued target data to the client
if len(cqueue) != 0:
c_pend = True
while(c_pend):
while (c_pend):
c_pend = self.send_frames(cqueue)
cqueue = []
@ -293,20 +293,20 @@ class WebSocketProxy(websockifyserver.WebSockifyServer):
def __init__(self, RequestHandlerClass=ProxyRequestHandler, *args, **kwargs):
# Save off proxy specific options
self.target_host = kwargs.pop('target_host', None)
self.target_port = kwargs.pop('target_port', None)
self.wrap_cmd = kwargs.pop('wrap_cmd', None)
self.wrap_mode = kwargs.pop('wrap_mode', None)
self.unix_target = kwargs.pop('unix_target', None)
self.ssl_target = kwargs.pop('ssl_target', None)
self.heartbeat = kwargs.pop('heartbeat', None)
self.target_host = kwargs.pop('target_host', None)
self.target_port = kwargs.pop('target_port', None)
self.wrap_cmd = kwargs.pop('wrap_cmd', None)
self.wrap_mode = kwargs.pop('wrap_mode', None)
self.unix_target = kwargs.pop('unix_target', None)
self.ssl_target = kwargs.pop('ssl_target', None)
self.heartbeat = kwargs.pop('heartbeat', None)
self.token_plugin = kwargs.pop('token_plugin', None)
self.host_token = kwargs.pop('host_token', None)
self.auth_plugin = kwargs.pop('auth_plugin', None)
# Last 3 timestamps command was run
self.wrap_times = [0, 0, 0]
self.wrap_times = [0, 0, 0]
if self.wrap_cmd:
wsdir = os.path.dirname(sys.argv[0])
@ -334,7 +334,7 @@ class WebSocketProxy(websockifyserver.WebSockifyServer):
sock.close()
# Insert rebinder at the head of the (possibly empty) LD_PRELOAD pathlist
ld_preloads = filter(None, [ self.rebinder, os.environ.get("LD_PRELOAD", None) ])
ld_preloads = filter(None, [self.rebinder, os.environ.get("LD_PRELOAD", None)])
os.environ.update({
"LD_PRELOAD": os.pathsep.join(ld_preloads),
@ -401,7 +401,7 @@ class WebSocketProxy(websockifyserver.WebSockifyServer):
sys.exit(ret)
elif self.wrap_mode == "respawn":
now = time.time()
avg = sum(self.wrap_times)/len(self.wrap_times)
avg = sum(self.wrap_times) / len(self.wrap_times)
if (now - avg) < 10:
# 3 times in the last 10 seconds
if self.spawn_message:
@ -756,32 +756,32 @@ class LibProxyServer(ThreadingMixIn, HTTPServer):
def __init__(self, RequestHandlerClass=ProxyRequestHandler, **kwargs):
# Save off proxy specific options
self.target_host = kwargs.pop('target_host', None)
self.target_port = kwargs.pop('target_port', None)
self.wrap_cmd = kwargs.pop('wrap_cmd', None)
self.wrap_mode = kwargs.pop('wrap_mode', None)
self.unix_target = kwargs.pop('unix_target', None)
self.ssl_target = kwargs.pop('ssl_target', None)
self.token_plugin = kwargs.pop('token_plugin', None)
self.auth_plugin = kwargs.pop('auth_plugin', None)
self.heartbeat = kwargs.pop('heartbeat', None)
self.target_host = kwargs.pop('target_host', None)
self.target_port = kwargs.pop('target_port', None)
self.wrap_cmd = kwargs.pop('wrap_cmd', None)
self.wrap_mode = kwargs.pop('wrap_mode', None)
self.unix_target = kwargs.pop('unix_target', None)
self.ssl_target = kwargs.pop('ssl_target', None)
self.token_plugin = kwargs.pop('token_plugin', None)
self.auth_plugin = kwargs.pop('auth_plugin', None)
self.heartbeat = kwargs.pop('heartbeat', None)
self.token_plugin = None
self.auth_plugin = None
self.daemon = False
# Server configuration
listen_host = kwargs.pop('listen_host', '')
listen_port = kwargs.pop('listen_port', None)
web = kwargs.pop('web', '')
listen_host = kwargs.pop('listen_host', '')
listen_port = kwargs.pop('listen_port', None)
web = kwargs.pop('web', '')
# Configuration affecting base request handler
self.only_upgrade = not web
self.verbose = kwargs.pop('verbose', False)
self.only_upgrade = not web
self.verbose = kwargs.pop('verbose', False)
record = kwargs.pop('record', '')
if record:
self.record = os.path.abspath(record)
self.run_once = kwargs.pop('run_once', False)
self.run_once = kwargs.pop('run_once', False)
self.handler_id = 0
for arg in kwargs.keys():

View File

@ -94,7 +94,7 @@ class WebSocketRequestHandlerMixIn:
def handle_websocket(self):
"""Handle a WebSocket connection.
This is called when the WebSocket is ready to be used. A
sub-class should perform the necessary communication here and
return once done.

View File

@ -75,7 +75,7 @@ class WebSockifyRequestHandler(WebSocketRequestHandlerMixIn, SimpleHTTPRequestHa
self.daemon = getattr(server, "daemon", False)
self.record = getattr(server, "record", False)
self.run_once = getattr(server, "run_once", False)
self.rec = None
self.rec = None
self.handler_id = getattr(server, "handler_id", False)
self.file_only = getattr(server, "file_only", False)
self.traffic = getattr(server, "traffic", False)
@ -126,7 +126,7 @@ class WebSockifyRequestHandler(WebSocketRequestHandlerMixIn, SimpleHTTPRequestHa
fully sent, in which case the caller should call again when
the socket is ready. """
tdelta = int(time.time()*1000) - self.start_time
tdelta = int(time.time() * 1000) - self.start_time
if bufs:
for buf in bufs:
@ -157,7 +157,7 @@ class WebSockifyRequestHandler(WebSocketRequestHandlerMixIn, SimpleHTTPRequestHa
closed = False
bufs = []
tdelta = int(time.time()*1000) - self.start_time
tdelta = int(time.time() * 1000) - self.start_time
while True:
try:
@ -209,8 +209,8 @@ class WebSockifyRequestHandler(WebSocketRequestHandlerMixIn, SimpleHTTPRequestHa
self.server.ws_connection = True
# Initialize per client settings
self.send_parts = []
self.recv_part = None
self.start_time = int(time.time()*1000)
self.recv_part = None
self.start_time = int(time.time() * 1000)
# client_address is empty with, say, UNIX domain sockets
client_addr = ""
@ -347,35 +347,35 @@ class WebSockifyServer():
# settings
self.RequestHandlerClass = RequestHandlerClass
self.verbose = verbose
self.listen_fd = listen_fd
self.unix_listen = unix_listen
self.unix_listen_mode = unix_listen_mode
self.listen_host = listen_host
self.listen_port = listen_port
self.prefer_ipv6 = source_is_ipv6
self.ssl_only = ssl_only
self.ssl_ciphers = ssl_ciphers
self.ssl_options = ssl_options
self.verify_client = verify_client
self.daemon = daemon
self.run_once = run_once
self.timeout = timeout
self.idle_timeout = idle_timeout
self.traffic = traffic
self.file_only = file_only
self.web_auth = web_auth
self.verbose = verbose
self.listen_fd = listen_fd
self.unix_listen = unix_listen
self.unix_listen_mode = unix_listen_mode
self.listen_host = listen_host
self.listen_port = listen_port
self.prefer_ipv6 = source_is_ipv6
self.ssl_only = ssl_only
self.ssl_ciphers = ssl_ciphers
self.ssl_options = ssl_options
self.verify_client = verify_client
self.daemon = daemon
self.run_once = run_once
self.timeout = timeout
self.idle_timeout = idle_timeout
self.traffic = traffic
self.file_only = file_only
self.web_auth = web_auth
self.launch_time = time.time()
self.ws_connection = False
self.handler_id = 1
self.terminating = False
self.launch_time = time.time()
self.ws_connection = False
self.handler_id = 1
self.terminating = False
self.logger = self.get_logger()
self.tcp_keepalive = tcp_keepalive
self.tcp_keepcnt = tcp_keepcnt
self.tcp_keepidle = tcp_keepidle
self.tcp_keepintvl = tcp_keepintvl
self.logger = self.get_logger()
self.tcp_keepalive = tcp_keepalive
self.tcp_keepcnt = tcp_keepcnt
self.tcp_keepidle = tcp_keepidle
self.tcp_keepintvl = tcp_keepintvl
# keyfile path must be None if not specified
self.key = None
@ -444,7 +444,7 @@ class WebSockifyServer():
@staticmethod
def socket(host, port=None, connect=False, prefer_ipv6=False,
unix_socket=None, unix_socket_mode=None, unix_socket_listen=False,
use_ssl=False, tcp_keepalive=True, tcp_keepcnt=None,
use_ssl=False, tcp_keepalive=True, tcp_keepcnt=None,
tcp_keepidle=None, tcp_keepintvl=None):
""" Resolve a host (and optional port) to an IPv4 or IPv6
address. Create a socket. Bind to it if listen is set,
@ -472,7 +472,7 @@ class WebSockifyServer():
addrs.reverse()
sock = socket.socket(addrs[0][0], addrs[0][1])
if tcp_keepalive:
if tcp_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if tcp_keepcnt:
sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT,
@ -816,7 +816,7 @@ class WebSockifyServer():
startsock, address = lsock.accept()
# Unix Socket will not report address (empty string), but address[0] is logged a bunch
if self.unix_listen != None:
address = [ self.unix_listen ]
address = [self.unix_listen]
else:
continue
except self.Terminate:
@ -838,7 +838,7 @@ class WebSockifyServer():
if self.run_once:
# Run in same process if run_once
self.top_new_client(startsock, address)
if self.ws_connection :
if self.ws_connection:
self.msg('%s: exiting due to --run-once'
% address[0])
break