From a5d4fe1d18dcd2ee669cd1f363273e4ccd905cf2 Mon Sep 17 00:00:00 2001 From: Joel Martin Date: Fri, 27 Jan 2012 15:14:45 -0600 Subject: [PATCH] Working vesrion Clojure Websockify. Still needs some cleanup related to cleaning up client and target connections. --- other/websockify.clj | 97 ++++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/other/websockify.clj b/other/websockify.clj index fb22f5a..c422f4d 100644 --- a/other/websockify.clj +++ b/other/websockify.clj @@ -1,30 +1,35 @@ (ns websockify (:use ring.adapter.jetty) - ;(:import [org.jboss.netty.handler.codec.base64 Base64]) - (:import - ;[java.io BufferedReader DataOutputStream] - + ;; Netty TCP Client + [java.util.concurrent Executors] [java.net InetSocketAddress] - [java.nio ByteBuffer] - [java.nio.channels SocketChannel] - [org.jboss.netty.channel Channels SimpleChannelHandler ChannelPipelineFactory] + [org.jboss.netty.buffer ChannelBuffers] [org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory] [org.jboss.netty.bootstrap ClientBootstrap] - [java.util.concurrent Executors] + [org.jboss.netty.handler.codec.base64 Base64] + [org.jboss.netty.util CharsetUtil] + ;; Jetty WebSocket Server [org.eclipse.jetty.server Server] [org.eclipse.jetty.server.nio BlockingChannelConnector] - [org.eclipse.jetty.servlet ServletContextHandler ServletHolder DefaultServlet] + [org.eclipse.jetty.servlet + ServletContextHandler ServletHolder DefaultServlet] [org.eclipse.jetty.websocket WebSocket WebSocketClientFactory WebSocketClient WebSocketServlet])) (defonce settings (atom {})) +;; WebSocket client to TCP target mappings + +(defonce clients (atom {})) +(defonce targets (atom {})) + + ;; TCP / NIO ;; (defn tcp-channel [host port] @@ -38,22 +43,21 @@ ;; nil))) ;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51 -(defn make-netty-client-handler [] - (proxy [SimpleChannelHandler] [] - (channelConnected [ctx e] - (println "channelConnected:" e)) - (channelDisconnected [ctx e] - (println "channelDisconnected:" e)) - (messageReceived [ctx e] - (println "messageReceived:" (.getMessage e))) - (exceptionCaught [ctx e] - (println "exceptionCaught:" e)))) +;; http://stackoverflow.com/questions/5453602/highly-concurrent-http-with-netty-and-nio +;; https://github.com/datskos/ring-netty-adapter/blob/master/src/ring/adapter/netty.clj -(defn netty-client [host port] - (let [pipeline (proxy [ChannelPipelineFactory] [] + +(defn netty-client [host port open close message] + (let [handler (proxy [SimpleChannelHandler] [] + (channelConnected [ctx e] (open ctx e)) + (channelDisconnected [ctx e] (close ctx e)) + (messageReceived [ctx e] (message ctx e)) + (exceptionCaught [ctx e] + (println "exceptionCaught:" e))) + pipeline (proxy [ChannelPipelineFactory] [] (getPipeline [] (doto (Channels/pipeline) - (.addLast "handler" (make-netty-client-handler))))) + (.addLast "handler" handler)))) bootstrap (doto (ClientBootstrap. (NioClientSocketChannelFactory. (Executors/newCachedThreadPool) @@ -69,22 +73,50 @@ ;; WebSockets +(defn target-open [ctx e] + (println "channelConnected:" e)) +(defn target-close [ctx e] + (println "channelDisconnected:" e)) +(defn target-message [ctx e] + (let [channel (.getChannel ctx) + client (get @targets channel) + msg (.getMessage e) + len (.readableBytes msg) + b64 (Base64/encode msg false) + blen (.readableBytes b64)] + (println "received " len "bytes from target") + #_(println "target receive:" (.toString msg 0 len CharsetUtil/UTF_8)) + #_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8)) + (.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8)) + )) -(defonce clients (atom {})) ;; http://wiki.eclipse.org/Jetty/Feature/WebSockets (defn make-websocket-handler [] (reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage (onOpen [this connection] (println "Got WebSocket connection:" connection) - #_(let [target (tcp-channel "localhost" 5901)] + (let [target (netty-client + "localhost" 5901 + target-open target-close target-message)] (swap! clients assoc this {:client connection - :target target}))) + :target target}) + (swap! targets assoc target connection))) (onClose [this code message] - (println "Got WebSocket close:" code message) - (swap! clients dissoc this)) + (do + (let [target (:target (get @clients this))] + (swap! clients dissoc this) + (swap! targets dissoc target) + ))) (onMessage [this data] - (println "Got WebSocket message:" data)))) + (println "WebSocket onMessage:" data) + (let [target (:target (get @clients this)) + cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8) + decbuf (Base64/decode cbuf) + rlen (.readableBytes decbuf)] + (println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8)) + (.write target decbuf) + )))) (defn make-websocket-servlet [] (proxy [org.eclipse.jetty.websocket.WebSocketServlet] [] @@ -103,13 +135,15 @@ target-host "localhost" target-port 5900 }}] + (reset! clients {}) + (reset! targets {}) (let [http-servlet (doto (ServletHolder. (DefaultServlet.)) (.setInitParameter "dirAllowed" "true") (.setInitParameter "resourceBase" web)) ws-servlet (ServletHolder. (make-websocket-servlet)) context (doto (ServletContextHandler.) (.setContextPath "/") - (.addServlet ws-servlet "/websocket")) + (.addServlet ws-servlet "/websockify")) connector (doto (BlockingChannelConnector.) (.setPort listen-port) (.setMaxIdleTime Integer/MAX_VALUE)) @@ -124,5 +158,8 @@ (println "Not serving web requests")) (defn stop [] - (.stop server)))) + (.stop server) + (reset! clients {}) + (reset! targets {}) + nil)))