From 6d4663de47c601f4aebf679ba5da8189cb84a70b Mon Sep 17 00:00:00 2001 From: li qun Date: Wed, 28 Feb 2018 14:28:07 +0800 Subject: [PATCH] Use encoded path in URL as vnc server:port Normally we only specify one vncserver address. Now we can generator URL with path=vncserver:port and user can use the link to connect vncserver:port. The path can be encoded with hash and creating date, then user can not change it and the link will be invalid after a few days. --- websockify/encode_url.py | 59 ++++++++++++++++++++++++------------ websockify/websocketproxy.py | 25 ++++++++++++++- 2 files changed, 64 insertions(+), 20 deletions(-) diff --git a/websockify/encode_url.py b/websockify/encode_url.py index da78d2c..ce2546d 100644 --- a/websockify/encode_url.py +++ b/websockify/encode_url.py @@ -4,37 +4,46 @@ import sys import hashlib import base64 +import datetime # Please change the salt for your own project. SALT = "Some salt for security. liqun@ncl.sg" -def encode(str): +def qencode(str, create_date=datetime.date.today(), salt=SALT): ''' The func encode str, hash it and base64 it.''' alg = hashlib.sha256() + tstr = create_date.strftime("%Y%m%d") + alg.update(tstr) + alg.update(':') alg.update(str) - alg.update(SALT) + alg.update(salt) hash = alg.hexdigest() - return base64.urlsafe_b64encode(hash+':'+str) -def decode(str): - ''' The func decode str, validate with hash and base64.decode it.''' + return base64.urlsafe_b64encode(hash+tstr+':'+str) +def qdecode(str, valid_daynum=3, salt=SALT): + ''' The func decode str, validate with hash and base64.decode it. + If valid_daynum =0, only today is valid.''' str1 = base64.urlsafe_b64decode(str) pos = str1.find(':') if pos == -1: return '' - hash = str1[0:pos] + hash = str1[0:pos-8] + tstr = str1[pos-8:pos] + url_date = datetime.date(int(str1[pos-8:pos-4]), int(str1[pos-4:pos-2]), int(str1[pos-2:pos])) + today = datetime.date.today() + if (today - url_date) > datetime.timedelta(valid_daynum): + return '' alg = hashlib.sha256() - alg.update(str1[pos+1:]) - alg.update(SALT) + alg.update(str1[pos-8:]) + alg.update(salt) hash1 = alg.hexdigest() if hash != hash1: - print "Error: str hash different" return '' return str1[pos+1:] -def get_server_from_path(path, is_encoded): +def get_server_from_path(path, is_encoded, valid_daynum=3, salt=SALT): '''The func decode host port from path parameter. - path looks like [/encode(n1.soc.cloud.ncl.sg:5901)] + path looks like [/qencode(n1.soc.cloud.ncl.sg:5901)] ''' try: if is_encoded: - str = decode(path[1:]) + str = qdecode(path[1:], valid_daynum, salt) else: str = path[1:] if str == '': @@ -51,10 +60,19 @@ def test_basic(): assert get_server_from_path('/n1.soc.cloud.ncl.sg:5901', False) == ('n1.soc.cloud.ncl.sg', 5901) str = "n1.soc.cloud.ncl.sg:5901" - enc = encode(str) - dec = decode(enc) + enc = qencode(str) + print enc + print base64.urlsafe_b64decode(enc) + dec = qdecode(enc) assert str == dec assert get_server_from_path('/'+enc, True) == ('n1.soc.cloud.ncl.sg', 5901) + enc = qencode(str, datetime.date(2018, 2, 24)) + assert (get_server_from_path('/'+enc, True)) == ('', 0) + enc = qencode(str, datetime.date.today(), 'test salt') + dec = qdecode(enc, 3, 'test salt') + assert str == dec + dec = qdecode(enc, 3, 'test salt1') + assert dec == '' def main(): '''The func is the main func.''' import sys @@ -63,17 +81,20 @@ def main(): print "Pass all test" exit() elif (len(sys.argv) == 2): - print encode(sys.argv[1]) + print qencode(sys.argv[1]) elif (len(sys.argv) == 3) and (sys.argv[1] == "decode"): - print decode(sys.argv[2]) + print qdecode(sys.argv[2]) + elif (len(sys.argv) == 5) and (sys.argv[1] == "decode"): + print qdecode(sys.argv[2],sys.argv[3],sys.argv[4]) + elif (len(sys.argv) == 5) and (sys.argv[1] == "encode"): + print qencode(sys.argv[2],sys.argv[3],sys.argv[4]) elif (len(sys.argv) == 3) and (sys.argv[1] == "debase"): str1 = base64.urlsafe_b64decode(sys.argv[2]) print str1 else: - print '''%s [content to encode] - decode [content to decode] + print '''%s [content to encode] [valid day number:3] [salt to encrypt] + decode [content to decode] [valid day number:3] [salt to encrypt] test''' % sys.argv[0] if __name__ == "__main__": main() - diff --git a/websockify/websocketproxy.py b/websockify/websocketproxy.py index 6c17c10..7adcd1f 100644 --- a/websockify/websocketproxy.py +++ b/websockify/websocketproxy.py @@ -30,6 +30,15 @@ try: except ImportError: from cgi import parse_qs from urlparse import urlparse +# Switch to open function let URL path define path. +# &path=vncserver.domain.name:port +URL_PATH_DEF_VNCSERVER = True +# Whether is URL path encoded +URL_PATH_ENCODED = False +# URL valid day number after creating. +URL_VALID_DAYNUM = 3 +# Salt used to valide the hash of the URL. +URL_SALT = 'Some salt used to hash the URL for integrity validation. liqun@ncl.sg' class ProxyRequestHandler(websockifyserver.WebSockifyRequestHandler): @@ -100,6 +109,13 @@ Traffic Legend: msg = "connecting to command: '%s' (port %s)" % (" ".join(self.server.wrap_cmd), self.server.target_port) elif self.server.unix_target: msg = "connecting to unix socket: %s" % self.server.unix_target + elif URL_PATH_DEF_VNCSERVER: + import encode_url + (ptarget_host, ptarget_port) = encode_url.get_server_from_path(\ + self.path, URL_PATH_ENCODED, URL_VALID_DAYNUM, URL_SALT) + if ptarget_port == 0: + raise self.server.EClose('Cannot decode path.') + msg = "connecting to: %s:%s" % (ptarget_host, ptarget_port) else: msg = "connecting to: %s:%s" % ( self.server.target_host, self.server.target_port) @@ -108,7 +124,14 @@ Traffic Legend: msg += " (using SSL)" self.log_message(msg) - tsock = websockifyserver.WebSockifyServer.socket(self.server.target_host, + if URL_PATH_DEF_VNCSERVER: + tsock = websockifyserver.WebSockifyServer.socket(ptarget_host, + ptarget_port, + connect=True, + use_ssl=self.server.ssl_target, + unix_socket=self.server.unix_target) + else: + tsock = websockifyserver.WebSockifyServer.socket(self.server.target_host, self.server.target_port, connect=True, use_ssl=self.server.ssl_target,