Working vesrion Clojure Websockify.

Still needs some cleanup related to cleaning up client and target
connections.
This commit is contained in:
Joel Martin 2012-01-27 15:14:45 -06:00 committed by Cédric de Saint Martin
parent 1e24aa17af
commit 040a129fbf
1 changed files with 67 additions and 30 deletions

View File

@ -1,30 +1,35 @@
(ns websockify (ns websockify
(:use ring.adapter.jetty) (:use ring.adapter.jetty)
;(:import [org.jboss.netty.handler.codec.base64 Base64])
(:import (:import
;[java.io BufferedReader DataOutputStream] ;; Netty TCP Client
[java.util.concurrent Executors]
[java.net InetSocketAddress] [java.net InetSocketAddress]
[java.nio ByteBuffer]
[java.nio.channels SocketChannel]
[org.jboss.netty.channel [org.jboss.netty.channel
Channels SimpleChannelHandler ChannelPipelineFactory] Channels SimpleChannelHandler ChannelPipelineFactory]
[org.jboss.netty.buffer ChannelBuffers]
[org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory] [org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory]
[org.jboss.netty.bootstrap ClientBootstrap] [org.jboss.netty.bootstrap ClientBootstrap]
[java.util.concurrent Executors] [org.jboss.netty.handler.codec.base64 Base64]
[org.jboss.netty.util CharsetUtil]
;; Jetty WebSocket Server
[org.eclipse.jetty.server Server] [org.eclipse.jetty.server Server]
[org.eclipse.jetty.server.nio BlockingChannelConnector] [org.eclipse.jetty.server.nio BlockingChannelConnector]
[org.eclipse.jetty.servlet ServletContextHandler ServletHolder DefaultServlet] [org.eclipse.jetty.servlet
ServletContextHandler ServletHolder DefaultServlet]
[org.eclipse.jetty.websocket [org.eclipse.jetty.websocket
WebSocket WebSocketClientFactory WebSocketClient WebSocket WebSocketClientFactory WebSocketClient
WebSocketServlet])) WebSocketServlet]))
(defonce settings (atom {})) (defonce settings (atom {}))
;; WebSocket client to TCP target mappings
(defonce clients (atom {}))
(defonce targets (atom {}))
;; TCP / NIO ;; TCP / NIO
;; (defn tcp-channel [host port] ;; (defn tcp-channel [host port]
@ -38,22 +43,21 @@
;; nil))) ;; nil)))
;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51 ;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51
(defn make-netty-client-handler [] ;; http://stackoverflow.com/questions/5453602/highly-concurrent-http-with-netty-and-nio
(proxy [SimpleChannelHandler] [] ;; https://github.com/datskos/ring-netty-adapter/blob/master/src/ring/adapter/netty.clj
(channelConnected [ctx e]
(println "channelConnected:" e))
(channelDisconnected [ctx e]
(println "channelDisconnected:" e))
(messageReceived [ctx e]
(println "messageReceived:" (.getMessage e)))
(exceptionCaught [ctx e]
(println "exceptionCaught:" e))))
(defn netty-client [host port]
(let [pipeline (proxy [ChannelPipelineFactory] [] (defn netty-client [host port open close message]
(let [handler (proxy [SimpleChannelHandler] []
(channelConnected [ctx e] (open ctx e))
(channelDisconnected [ctx e] (close ctx e))
(messageReceived [ctx e] (message ctx e))
(exceptionCaught [ctx e]
(println "exceptionCaught:" e)))
pipeline (proxy [ChannelPipelineFactory] []
(getPipeline [] (getPipeline []
(doto (Channels/pipeline) (doto (Channels/pipeline)
(.addLast "handler" (make-netty-client-handler))))) (.addLast "handler" handler))))
bootstrap (doto (ClientBootstrap. bootstrap (doto (ClientBootstrap.
(NioClientSocketChannelFactory. (NioClientSocketChannelFactory.
(Executors/newCachedThreadPool) (Executors/newCachedThreadPool)
@ -69,22 +73,50 @@
;; WebSockets ;; WebSockets
(defn target-open [ctx e]
(println "channelConnected:" e))
(defn target-close [ctx e]
(println "channelDisconnected:" e))
(defn target-message [ctx e]
(let [channel (.getChannel ctx)
client (get @targets channel)
msg (.getMessage e)
len (.readableBytes msg)
b64 (Base64/encode msg false)
blen (.readableBytes b64)]
(println "received " len "bytes from target")
#_(println "target receive:" (.toString msg 0 len CharsetUtil/UTF_8))
#_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8))
(.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8))
))
(defonce clients (atom {}))
;; http://wiki.eclipse.org/Jetty/Feature/WebSockets ;; http://wiki.eclipse.org/Jetty/Feature/WebSockets
(defn make-websocket-handler [] (defn make-websocket-handler []
(reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage (reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage
(onOpen [this connection] (onOpen [this connection]
(println "Got WebSocket connection:" connection) (println "Got WebSocket connection:" connection)
#_(let [target (tcp-channel "localhost" 5901)] (let [target (netty-client
"localhost" 5901
target-open target-close target-message)]
(swap! clients assoc this {:client connection (swap! clients assoc this {:client connection
:target target}))) :target target})
(swap! targets assoc target connection)))
(onClose [this code message] (onClose [this code message]
(println "Got WebSocket close:" code message) (do
(swap! clients dissoc this)) (let [target (:target (get @clients this))]
(swap! clients dissoc this)
(swap! targets dissoc target)
)))
(onMessage [this data] (onMessage [this data]
(println "Got WebSocket message:" data)))) (println "WebSocket onMessage:" data)
(let [target (:target (get @clients this))
cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
decbuf (Base64/decode cbuf)
rlen (.readableBytes decbuf)]
(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
(.write target decbuf)
))))
(defn make-websocket-servlet [] (defn make-websocket-servlet []
(proxy [org.eclipse.jetty.websocket.WebSocketServlet] [] (proxy [org.eclipse.jetty.websocket.WebSocketServlet] []
@ -103,13 +135,15 @@
target-host "localhost" target-host "localhost"
target-port 5900 target-port 5900
}}] }}]
(reset! clients {})
(reset! targets {})
(let [http-servlet (doto (ServletHolder. (DefaultServlet.)) (let [http-servlet (doto (ServletHolder. (DefaultServlet.))
(.setInitParameter "dirAllowed" "true") (.setInitParameter "dirAllowed" "true")
(.setInitParameter "resourceBase" web)) (.setInitParameter "resourceBase" web))
ws-servlet (ServletHolder. (make-websocket-servlet)) ws-servlet (ServletHolder. (make-websocket-servlet))
context (doto (ServletContextHandler.) context (doto (ServletContextHandler.)
(.setContextPath "/") (.setContextPath "/")
(.addServlet ws-servlet "/websocket")) (.addServlet ws-servlet "/websockify"))
connector (doto (BlockingChannelConnector.) connector (doto (BlockingChannelConnector.)
(.setPort listen-port) (.setPort listen-port)
(.setMaxIdleTime Integer/MAX_VALUE)) (.setMaxIdleTime Integer/MAX_VALUE))
@ -124,5 +158,8 @@
(println "Not serving web requests")) (println "Not serving web requests"))
(defn stop [] (defn stop []
(.stop server)))) (.stop server)
(reset! clients {})
(reset! targets {})
nil)))