Split generic websocket code out from websockify.

Generic TCP client and Websocket server code is now split out from the
websockify specific code.
This commit is contained in:
Joel Martin 2012-01-27 16:01:17 -06:00
parent 3278905ead
commit f2f838f7e2
1 changed files with 101 additions and 65 deletions

View File

@ -2,6 +2,7 @@
(:use ring.adapter.jetty) (:use ring.adapter.jetty)
(:import (:import
;; Netty TCP Client ;; Netty TCP Client
[java.util.concurrent Executors] [java.util.concurrent Executors]
[java.net InetSocketAddress] [java.net InetSocketAddress]
@ -22,13 +23,6 @@
WebSocket WebSocketClientFactory WebSocketClient WebSocket WebSocketClientFactory WebSocketClient
WebSocketServlet])) WebSocketServlet]))
(defonce settings (atom {}))
;; WebSocket client to TCP target mappings
(defonce clients (atom {}))
(defonce targets (atom {}))
;; TCP / NIO ;; TCP / NIO
@ -73,14 +67,71 @@
;; WebSockets ;; WebSockets
;; http://wiki.eclipse.org/Jetty/Feature/WebSockets
(defn make-websocket-servlet [open close message]
(proxy [org.eclipse.jetty.websocket.WebSocketServlet] []
(doGet [request response]
;;(println "doGet" request)
(.. (proxy-super getServletContext)
(getNamedDispatcher (proxy-super getServletName))
(forward request response)))
(doWebSocketConnect [request response]
(println "doWebSocketConnect")
(reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage
(onOpen [this connection] (open this connection))
(onClose [this code message] (close this code message))
(onMessage [this data] (message this data))))))
(defn websocket-server
[port & {:keys [open close message ws-path web]
:or {open (fn [_ conn]
(println "New websocket client:" conn))
close (fn [_ code reason]
(println "Websocket client closed:" code reason))
message (fn [_ data]
(println "Websocket message:" data))
ws-path "/websocket"}}]
(let [http-servlet (doto (ServletHolder. (DefaultServlet.))
(.setInitParameter "dirAllowed" "true")
(.setInitParameter "resourceBase" web))
ws-servlet (ServletHolder.
(make-websocket-servlet open close message))
context (doto (ServletContextHandler.)
(.setContextPath "/")
(.addServlet ws-servlet ws-path))
connector (doto (BlockingChannelConnector.)
(.setPort port)
(.setMaxIdleTime Integer/MAX_VALUE))
server (doto (Server.)
(.setHandler context)
(.addConnector connector))]
(when web (.addServlet context http-servlet "/"))
server))
;; Websockify
(defonce settings (atom {}))
;; WebSocket client to TCP target mappings
(defonce clients (atom {}))
(defonce targets (atom {}))
(defn target-open [ctx e] (defn target-open [ctx e]
(println "Connected to target") (println "Connected to target")
#_(println "channelConnected:" e)) #_(println "channelConnected:" e))
(defn target-close [ctx e] (defn target-close [ctx e]
#_(println "channelDisconnected:" e) #_(println "channelDisconnected:" e)
(println "Target closed") (println "Target closed")
(when-let [channel (get @targets (.getChannel ctx))] (when-let [channel (get @targets (.getChannel ctx))]
(.disconnect channel))) (.disconnect channel)))
(defn target-message [ctx e] (defn target-message [ctx e]
(let [channel (.getChannel ctx) (let [channel (.getChannel ctx)
client (get @targets channel) client (get @targets channel)
@ -93,77 +144,62 @@
#_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8)) #_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8))
(.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8)))) (.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8))))
(defn client-open [this connection]
#_(println "Got WebSocket connection:" connection)
(println "New client")
(let [target (netty-client
(:target-host @settings)
(:target-port @settings)
target-open target-close target-message)]
(swap! clients assoc this {:client connection
:target target})
(swap! targets assoc target connection)))
;; http://wiki.eclipse.org/Jetty/Feature/WebSockets (defn client-close [this code message]
(defn make-websocket-handler [] (println "WebSocket connection closed")
(reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage (when-let [target (:target (get @clients this))]
(onOpen [this connection] (println "Closing target")
#_(println "Got WebSocket connection:" connection) (.close target)
(println "New client") (println "Target closed")
(let [target (netty-client (swap! targets dissoc target))
"localhost" 5901 (swap! clients dissoc this))
target-open target-close target-message)]
(swap! clients assoc this {:client connection
:target target})
(swap! targets assoc target connection)))
(onClose [this code message]
(println "WebSocket connection closed")
(when-let [target (:target (get @clients this))]
(println "Closing target")
(.close target)
(println "Target closed")
(swap! targets dissoc target))
(swap! clients dissoc this))
(onMessage [this data]
#_(println "WebSocket onMessage:" data)
(let [target (:target (get @clients this))
cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
decbuf (Base64/decode cbuf)
rlen (.readableBytes decbuf)]
#_(println "Sending" rlen "bytes to target")
#_(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
(.write target decbuf)))))
(defn make-websocket-servlet [] (defn client-message [this data]
(proxy [org.eclipse.jetty.websocket.WebSocketServlet] [] #_(println "WebSocket onMessage:" data)
(doGet [request response] (let [target (:target (get @clients this))
;(println "doGet" request) cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
(.. (proxy-super getServletContext) decbuf (Base64/decode cbuf)
(getNamedDispatcher (proxy-super getServletName)) rlen (.readableBytes decbuf)]
(forward request response))) #_(println "Sending" rlen "bytes to target")
(doWebSocketConnect [request response] #_(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
(println "doWebSocketConnect") (.write target decbuf)))
(make-websocket-handler))))
(defn start-websocket-server (defn start-websockify
[& {:keys [listen-port target-host target-port web] [& {:keys [listen-port target-host target-port web]
:or {listen-port 6080 :or {listen-port 6080
target-host "localhost" target-host "localhost"
target-port 5900 target-port 5900
}}] }}]
(reset! clients {}) (reset! clients {})
(reset! targets {}) (reset! targets {})
(let [http-servlet (doto (ServletHolder. (DefaultServlet.))
(.setInitParameter "dirAllowed" "true") (reset! settings {:target-host target-host
(.setInitParameter "resourceBase" web)) :target-port target-port})
ws-servlet (ServletHolder. (make-websocket-servlet)) (let [server (websocket-server listen-port
context (doto (ServletContextHandler.) :web web
(.setContextPath "/") :ws-path "/websockify"
(.addServlet ws-servlet "/websockify")) :open client-open
connector (doto (BlockingChannelConnector.) :close client-close
(.setPort listen-port) :message client-message)]
(.setMaxIdleTime Integer/MAX_VALUE))
server (doto (Server.) (.start server)
(.setHandler context)
(.addConnector connector)
(.start))]
(if web (if web
(do (println "Serving web requests from:" web)
(println "Serving web requests from:" web)
(.addServlet context http-servlet "/"))
(println "Not serving web requests")) (println "Not serving web requests"))
(defn stop [] (defn stop-websockify []
(doseq [client (vals @clients)] (doseq [client (vals @clients)]
(.disconnect (:client client)) (.disconnect (:client client))
(.close (:target client))) (.close (:target client)))