Switch to Jetty for Clojure WebSocket support.
This current code accepts WebSocket connections and can send data back and forth.
This commit is contained in:
parent
0664584843
commit
9aa2844856
|
|
@ -1,14 +1,12 @@
|
||||||
(defproject websockify "1.0.0-SNAPSHOT"
|
(defproject websockify "1.0.0-SNAPSHOT"
|
||||||
:description "Websockify"
|
:description "Websockify"
|
||||||
:dependencies [[org.clojure/clojure "1.2.1"]
|
:dependencies [[org.clojure/clojure "1.2.1"]
|
||||||
[aleph "0.2.0"]
|
|
||||||
|
|
||||||
[ring/ring-jetty-adapter "1.0.0-beta2"]
|
[ring/ring-jetty-adapter "1.0.0-beta2"]
|
||||||
;[ring-json-params "0.1.3"]
|
[org.eclipse.jetty/jetty-websocket "7.5.4.v20111024"]
|
||||||
[compojure "0.6.5"]
|
[org.eclipse.jetty/jetty-server "7.5.4.v20111024"]
|
||||||
|
[org.eclipse.jetty/jetty-servlet "7.5.4.v20111024"]
|
||||||
;[commons-codec/commons-codec "1.4"]
|
;[commons-codec/commons-codec "1.4"]
|
||||||
;[clj-base64 "0.0.0-SNAPSHOT"]
|
;[clj-base64 "0.0.0-SNAPSHOT"]
|
||||||
|
|
||||||
]
|
]
|
||||||
; :dev-dependencies [[swank-clojure "1.3.0-SNAPSHOT"]]
|
; :dev-dependencies [[swank-clojure "1.3.0-SNAPSHOT"]]
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,109 +1,69 @@
|
||||||
(ns websockify
|
(ns websockify
|
||||||
(:use compojure.core,
|
(:use ring.adapter.jetty)
|
||||||
ring.adapter.jetty,
|
;(:import [org.jboss.netty.handler.codec.base64 Base64])
|
||||||
aleph.http,
|
|
||||||
aleph.tcp,
|
|
||||||
aleph.http.core,
|
|
||||||
lamina.core)
|
|
||||||
(:require [compojure.route :as route]
|
|
||||||
[aleph.formats])
|
|
||||||
(:import [org.jboss.netty.handler.codec.base64 Base64])
|
|
||||||
|
|
||||||
(:use [clojure.java.io :only (file)])
|
(:import [java.net URL URI]
|
||||||
|
[org.eclipse.jetty.server Server]
|
||||||
|
[org.eclipse.jetty.server.nio BlockingChannelConnector]
|
||||||
|
[org.eclipse.jetty.servlet ServletContextHandler ServletHolder DefaultServlet]
|
||||||
|
[org.eclipse.jetty.websocket
|
||||||
|
WebSocket WebSocketClientFactory WebSocketClient
|
||||||
|
WebSocketServlet]))
|
||||||
|
|
||||||
(:import [java.net URL URI]))
|
(defonce settings (atom {}))
|
||||||
|
|
||||||
(def settings (atom {}))
|
|
||||||
|
|
||||||
;; WebSockets
|
;; WebSockets
|
||||||
(def clients (atom {}))
|
(defonce clients (atom {}))
|
||||||
|
|
||||||
;(defn base64-encode
|
|
||||||
; "Encodes the data into a base64 string representation."
|
|
||||||
; [data]
|
|
||||||
; (when data
|
|
||||||
; (-> data to-channel-buffer Base64/encode
|
|
||||||
; channel-buffer->string)))
|
|
||||||
;
|
|
||||||
;(defn base64-decode
|
|
||||||
; "Decodes a base64 encoded string into bytes."
|
|
||||||
; [string]
|
|
||||||
; (when string
|
|
||||||
; (-> string string->channel-buffer Base64/decode)))
|
|
||||||
|
|
||||||
|
|
||||||
(defn receive-client-msg [client target b64]
|
(defn make-websocket-handler []
|
||||||
(if b64
|
(reify org.eclipse.jetty.websocket.WebSocket$OnTextMessage
|
||||||
(try
|
(onOpen [this connection]
|
||||||
(println "Client b64 data: " b64)
|
(println "Got WebSocket connection:" connection)
|
||||||
(let [raw (aleph.formats/base64-decode b64)]
|
(swap! clients assoc this connection))
|
||||||
(do
|
(onClose [this code message]
|
||||||
(println "Sending to target: " (pr-str (.toString raw "latin1")))
|
(println "Got WebSocket close:" code message)
|
||||||
(println "Sending to target: " (map #(mod % 256) (into-array ^char (.array raw))))
|
(swap! clients dissoc this))
|
||||||
(enqueue target (.array raw))))
|
(onMessage [this data]
|
||||||
(catch Exception e (println e)))
|
(println "Got WebSocket message:" data))))
|
||||||
(do
|
|
||||||
(println "Client Closed")
|
|
||||||
(swap! clients dissoc client)
|
|
||||||
(close target)
|
|
||||||
(close client))))
|
|
||||||
|
|
||||||
(defn receive-target-msg [client target raw]
|
(defn websocket-servlet []
|
||||||
(if raw
|
(proxy [org.eclipse.jetty.websocket.WebSocketServlet] []
|
||||||
(try
|
(doGet [request response]
|
||||||
(println "Target raw data: " (pr-str (aleph.formats/bytes->string raw "latin1")))
|
;(println "doGet" request)
|
||||||
(println "Target raw class: " (class raw))
|
(.. (proxy-super getServletContext)
|
||||||
(println "Target raw array: " (.array raw))
|
(getNamedDispatcher (proxy-super getServletName))
|
||||||
(println "Target raw data: " (map #(mod % 256) (into-array ^char (.array raw))))
|
(forward request response)))
|
||||||
(let [b64 (aleph.formats/base64-encode raw)]
|
(doWebSocketConnect [request response]
|
||||||
(do
|
(println "doWebSocketConnect")
|
||||||
(println "Sending to client: " b64)
|
(make-websocket-handler))))
|
||||||
(enqueue client b64)))
|
|
||||||
(catch Exception e (println e)))
|
|
||||||
(do
|
|
||||||
(println "Target Closed")
|
|
||||||
(swap! clients dissoc client)
|
|
||||||
(close client)
|
|
||||||
(close target))
|
|
||||||
))
|
|
||||||
|
|
||||||
(defn ws-handler [client handshake]
|
(defn start-websocket-server
|
||||||
(let [target-host (@settings :target-host)
|
[& {:keys [listen-port target-host target-port web]
|
||||||
target-port (@settings :target-port)
|
|
||||||
target @(tcp-client {:host target-host, :port target-port})]
|
|
||||||
(println "Connected to target")
|
|
||||||
(println "client: " (class client) ", target: " (class target))
|
|
||||||
(receive-all client #(receive-client-msg client target %))
|
|
||||||
(println "Started receive-client-msg")
|
|
||||||
(receive-all target #(receive-target-msg client target %))
|
|
||||||
(println "Started receive-target-msg")
|
|
||||||
(swap! clients assoc client target)))
|
|
||||||
|
|
||||||
;; HTTP
|
|
||||||
(defn get-routes [root]
|
|
||||||
(defroutes main-routes
|
|
||||||
(GET "*" {websocket :websocket}
|
|
||||||
(when websocket (wrap-aleph-handler ws-handler)))
|
|
||||||
|
|
||||||
(route/files "/"
|
|
||||||
{:root root}
|
|
||||||
(route/not-found (file "www/404.html")))
|
|
||||||
(route/not-found "<h1>Page not found</h1>")))
|
|
||||||
|
|
||||||
(defn start
|
|
||||||
"Start websockify
|
|
||||||
:listen-port - port to start server on (default 6080)
|
|
||||||
:opts - map of Noir server options"
|
|
||||||
[& {:keys [listen-port target-host target-port web opts]
|
|
||||||
:or {listen-port 6080
|
:or {listen-port 6080
|
||||||
target-host "localhost"
|
target-host "localhost"
|
||||||
target-port 5900
|
target-port 5900
|
||||||
web "./"
|
}}]
|
||||||
opts {}}}]
|
(let [http-servlet (doto (ServletHolder. (DefaultServlet.))
|
||||||
|
(.setInitParameter "dirAllowed" "true")
|
||||||
|
(.setInitParameter "resourceBase" web))
|
||||||
|
ws-servlet (ServletHolder. (websocket-servlet))
|
||||||
|
context (doto (ServletContextHandler.)
|
||||||
|
(.setContextPath "/")
|
||||||
|
(.addServlet ws-servlet "/websocket"))
|
||||||
|
connector (doto (BlockingChannelConnector.)
|
||||||
|
(.setPort listen-port)
|
||||||
|
(.setMaxIdleTime Integer/MAX_VALUE))
|
||||||
|
server (doto (Server.)
|
||||||
|
(.setHandler context)
|
||||||
|
(.addConnector connector)
|
||||||
|
(.start))]
|
||||||
|
(if web
|
||||||
|
(do
|
||||||
|
(println "Serving web requests from:" web)
|
||||||
|
(.addServlet context http-servlet "/"))
|
||||||
|
(println "Not serving web requests"))
|
||||||
|
|
||||||
|
(defn stop []
|
||||||
|
(.stop server))))
|
||||||
|
|
||||||
(reset! settings {:target-host target-host
|
|
||||||
:target-port target-port})
|
|
||||||
(def stop
|
|
||||||
(let [stop (start-http-server (wrap-ring-handler (get-routes web))
|
|
||||||
{:port listen-port :websocket true})]
|
|
||||||
(fn [] (stop)))))
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue