Use encoded path in URL as vnc server:port
Normally we only specify one vncserver address. Now we can generator URL with path=vncserver:port and user can use the link to connect vncserver:port. The path can be encoded with hash and creating date, then user can not change it and the link will be invalid after a few days.
This commit is contained in:
parent
0b36a3fdb6
commit
6d4663de47
|
|
@ -4,37 +4,46 @@
|
|||
import sys
|
||||
import hashlib
|
||||
import base64
|
||||
import datetime
|
||||
# Please change the salt for your own project.
|
||||
SALT = "Some salt for security. liqun@ncl.sg"
|
||||
def encode(str):
|
||||
def qencode(str, create_date=datetime.date.today(), salt=SALT):
|
||||
''' The func encode str, hash it and base64 it.'''
|
||||
alg = hashlib.sha256()
|
||||
tstr = create_date.strftime("%Y%m%d")
|
||||
alg.update(tstr)
|
||||
alg.update(':')
|
||||
alg.update(str)
|
||||
alg.update(SALT)
|
||||
alg.update(salt)
|
||||
hash = alg.hexdigest()
|
||||
return base64.urlsafe_b64encode(hash+':'+str)
|
||||
def decode(str):
|
||||
''' The func decode str, validate with hash and base64.decode it.'''
|
||||
return base64.urlsafe_b64encode(hash+tstr+':'+str)
|
||||
def qdecode(str, valid_daynum=3, salt=SALT):
|
||||
''' The func decode str, validate with hash and base64.decode it.
|
||||
If valid_daynum =0, only today is valid.'''
|
||||
str1 = base64.urlsafe_b64decode(str)
|
||||
pos = str1.find(':')
|
||||
if pos == -1:
|
||||
return ''
|
||||
hash = str1[0:pos]
|
||||
hash = str1[0:pos-8]
|
||||
tstr = str1[pos-8:pos]
|
||||
url_date = datetime.date(int(str1[pos-8:pos-4]), int(str1[pos-4:pos-2]), int(str1[pos-2:pos]))
|
||||
today = datetime.date.today()
|
||||
if (today - url_date) > datetime.timedelta(valid_daynum):
|
||||
return ''
|
||||
alg = hashlib.sha256()
|
||||
alg.update(str1[pos+1:])
|
||||
alg.update(SALT)
|
||||
alg.update(str1[pos-8:])
|
||||
alg.update(salt)
|
||||
hash1 = alg.hexdigest()
|
||||
if hash != hash1:
|
||||
print "Error: str hash different"
|
||||
return ''
|
||||
return str1[pos+1:]
|
||||
def get_server_from_path(path, is_encoded):
|
||||
def get_server_from_path(path, is_encoded, valid_daynum=3, salt=SALT):
|
||||
'''The func decode host port from path parameter.
|
||||
path looks like [/encode(n1.soc.cloud.ncl.sg:5901)]
|
||||
path looks like [/qencode(n1.soc.cloud.ncl.sg:5901)]
|
||||
'''
|
||||
try:
|
||||
if is_encoded:
|
||||
str = decode(path[1:])
|
||||
str = qdecode(path[1:], valid_daynum, salt)
|
||||
else:
|
||||
str = path[1:]
|
||||
if str == '':
|
||||
|
|
@ -51,10 +60,19 @@ def test_basic():
|
|||
assert get_server_from_path('/n1.soc.cloud.ncl.sg:5901', False) == ('n1.soc.cloud.ncl.sg', 5901)
|
||||
|
||||
str = "n1.soc.cloud.ncl.sg:5901"
|
||||
enc = encode(str)
|
||||
dec = decode(enc)
|
||||
enc = qencode(str)
|
||||
print enc
|
||||
print base64.urlsafe_b64decode(enc)
|
||||
dec = qdecode(enc)
|
||||
assert str == dec
|
||||
assert get_server_from_path('/'+enc, True) == ('n1.soc.cloud.ncl.sg', 5901)
|
||||
enc = qencode(str, datetime.date(2018, 2, 24))
|
||||
assert (get_server_from_path('/'+enc, True)) == ('', 0)
|
||||
enc = qencode(str, datetime.date.today(), 'test salt')
|
||||
dec = qdecode(enc, 3, 'test salt')
|
||||
assert str == dec
|
||||
dec = qdecode(enc, 3, 'test salt1')
|
||||
assert dec == ''
|
||||
def main():
|
||||
'''The func is the main func.'''
|
||||
import sys
|
||||
|
|
@ -63,17 +81,20 @@ def main():
|
|||
print "Pass all test"
|
||||
exit()
|
||||
elif (len(sys.argv) == 2):
|
||||
print encode(sys.argv[1])
|
||||
print qencode(sys.argv[1])
|
||||
elif (len(sys.argv) == 3) and (sys.argv[1] == "decode"):
|
||||
print decode(sys.argv[2])
|
||||
print qdecode(sys.argv[2])
|
||||
elif (len(sys.argv) == 5) and (sys.argv[1] == "decode"):
|
||||
print qdecode(sys.argv[2],sys.argv[3],sys.argv[4])
|
||||
elif (len(sys.argv) == 5) and (sys.argv[1] == "encode"):
|
||||
print qencode(sys.argv[2],sys.argv[3],sys.argv[4])
|
||||
elif (len(sys.argv) == 3) and (sys.argv[1] == "debase"):
|
||||
str1 = base64.urlsafe_b64decode(sys.argv[2])
|
||||
print str1
|
||||
else:
|
||||
print '''%s [content to encode]
|
||||
decode [content to decode]
|
||||
print '''%s [content to encode] [valid day number:3] [salt to encrypt]
|
||||
decode [content to decode] [valid day number:3] [salt to encrypt]
|
||||
test''' % sys.argv[0]
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,15 @@ try:
|
|||
except ImportError:
|
||||
from cgi import parse_qs
|
||||
from urlparse import urlparse
|
||||
# Switch to open function let URL path define path.
|
||||
# &path=vncserver.domain.name:port
|
||||
URL_PATH_DEF_VNCSERVER = True
|
||||
# Whether is URL path encoded
|
||||
URL_PATH_ENCODED = False
|
||||
# URL valid day number after creating.
|
||||
URL_VALID_DAYNUM = 3
|
||||
# Salt used to valide the hash of the URL.
|
||||
URL_SALT = 'Some salt used to hash the URL for integrity validation. liqun@ncl.sg'
|
||||
|
||||
class ProxyRequestHandler(websockifyserver.WebSockifyRequestHandler):
|
||||
|
||||
|
|
@ -100,6 +109,13 @@ Traffic Legend:
|
|||
msg = "connecting to command: '%s' (port %s)" % (" ".join(self.server.wrap_cmd), self.server.target_port)
|
||||
elif self.server.unix_target:
|
||||
msg = "connecting to unix socket: %s" % self.server.unix_target
|
||||
elif URL_PATH_DEF_VNCSERVER:
|
||||
import encode_url
|
||||
(ptarget_host, ptarget_port) = encode_url.get_server_from_path(\
|
||||
self.path, URL_PATH_ENCODED, URL_VALID_DAYNUM, URL_SALT)
|
||||
if ptarget_port == 0:
|
||||
raise self.server.EClose('Cannot decode path.')
|
||||
msg = "connecting to: %s:%s" % (ptarget_host, ptarget_port)
|
||||
else:
|
||||
msg = "connecting to: %s:%s" % (
|
||||
self.server.target_host, self.server.target_port)
|
||||
|
|
@ -108,7 +124,14 @@ Traffic Legend:
|
|||
msg += " (using SSL)"
|
||||
self.log_message(msg)
|
||||
|
||||
tsock = websockifyserver.WebSockifyServer.socket(self.server.target_host,
|
||||
if URL_PATH_DEF_VNCSERVER:
|
||||
tsock = websockifyserver.WebSockifyServer.socket(ptarget_host,
|
||||
ptarget_port,
|
||||
connect=True,
|
||||
use_ssl=self.server.ssl_target,
|
||||
unix_socket=self.server.unix_target)
|
||||
else:
|
||||
tsock = websockifyserver.WebSockifyServer.socket(self.server.target_host,
|
||||
self.server.target_port,
|
||||
connect=True,
|
||||
use_ssl=self.server.ssl_target,
|
||||
|
|
|
|||
Loading…
Reference in New Issue