SockJS Java 客户端自动重新连接

SockJS Java Client auto reconnect

我正在使用 SockJs Java 客户端连接不同服务器上的 websocket 运行。一切正常,比如服务器发布消息,我的 Java 客户端收到它,但是如果服务器重新启动,那么当我重新启动客户端时我无法收到任何 reply.But 然后一切正常。所以我想通过重新启动 SockJs Java 客户端来实现重新连接逻辑。我的代码如下:

public class Application {

    private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();

    private static Logger logger = Logger.getLogger(Application.class);

    public static void main(String[] args) throws InterruptedException, ExecutionException {, args);

        Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        List<Transport> transports = Collections.singletonList(webSocketTransport);

        SockJsClient sockJsClient = new SockJsClient(transports);
        sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());

        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);

        String url = "ws://{host}:{port}/hello";

        ListenableFuture<StompSession> f = stompClient.connect(url, headers, new MyWebSocketHandler(), "localhost", 9090);
        StompSession stompSession = f.get();"Subscribing to greeting topic using session " + stompSession);


    public static void subscribeGreetings(StompSession stompSession) throws ExecutionException, InterruptedException {

        stompSession.subscribe("/topic/greetings", new StompFrameHandler() {

            public Type getPayloadType(StompHeaders stompHeaders) {
                return byte[].class;

            public void handleFrame(StompHeaders stompHeaders, Object o) {
      "Received greeting " + new String((byte[]) o));

您可以实施代理模式。我使用 Java 模式重新连接。 但是我的实现是针对原始 Websocket java 客户端的。

代理模式涉及原始对象的功能。您需要将 reconnect() 和 onReconnect() 方法添加到 websocket 的生命周期中。 Websocket 只有 onError、onConnect、OnMessage、OnClose 方法。

我将为 SockJS Java 客户端实现一个重新连接器,然后我会 post 如果您需要代码。

我的一些 java 代码,但用于原始 websocket,不使用 STOMP。 基于此 java 脚本重新连接实现:


 * Problemas conocidos:
 *  Si se realiza la instancia dos veces consecutivas, se generan dos conexiones. Esto es debido a que el Executor
 *   crea dos tareas y no limita.
 * Una posible solucion seria convertir esta API de reconexion en un Singleton, sin embargo hacer eso significa solo
 *  poder crear una coneccion.
 * Una forma de solucionar esto es crear una fabrica de websockets. Y por otro lado tener la API como un singleton. 
 * Para reproducir el problema:
 *      ReconnectWebsocket ws = new ReconnectWebsocket(new LogicWsExternal() {
            public void onOpen(Session session) {
            }   // This is myPersonalEndPoint

            },uri, settings

        ws = new ReconnectWebsocket(new LogicWsExternal() {
            public void onOpen(Session session) {
            }   // This is myPersonalEndPoint

            },uri, settings
        // .. dos conexiones...
 * @author gas
public class ReconnectWebsocket implements ReconnectObserver{
    private final Logger LOG = LoggerFactory.getLogger(ReconnectWebsocket.class);

    public static final String ANSI_RESET = "\u001B[0m";
    public static final String ANSI_BLUE = "\u001B[34m";
    public static final String ANSI_WHITE = "\u001B[37m";

    // private static ReconnectWebsocketTest instance = null;

    Boolean debug;
    Boolean automaticOpen;
    Integer reconnectInterval;
    Integer maxReconnectionInterval;
    Float reconnectDecay;
    Integer timeoutInterval;
    Integer maxConnectAttempts;
    String binaryType;

    // These should be treated as read-only properties

    /** The URL as resolved by the constructor. This is always an absolute URL. Reas only. */
    URI path;

    /** The number of attempted reconnects since starting, or the last successful connection. Read only. */
    int connectAttemptsCount;

    * The current state of the connection.
    * Can be one of: WebSocket.CONNECTING, WebSocket.OPEN, WebSocket.CLOSING, WebSocket.CLOSED
    * Read only.
    WebSocketStates readyState = WebSocketStates.CLOSED;

     * A string indicating the name of the sub-protocol the server selected; this will be one of
     * the strings specified in the protocols parameter when creating the WebSocket object.
     * Read only.
    // TODO
    //this.protocol = null;

    // Private state variables
    //ReconnectWebsocket self = this;
    Session session;                    // In Javascript implementation is ws variable.
    boolean forcedClose = false;
    boolean timeOutFlag = false;

    Future<Boolean> timerReconnectionFuture;            // Usado para controlar el timer para las reconexnes.
    //private ReconnectObservable observable;

    LogicWsExternal logicExternal;

    ClientManager client;
    LogicWs wsLogic;

    static ScheduledExecutorService executor;
    static ScheduledExecutorService executor2;

    Future openFuture;
    AtomicBoolean openFlag;
     * Tyrus estates:
     *      org.glassfish.tyrus.core.TyrusWebSocket.State
     * */
    public static enum WebSocketStates {                //  Tyrus:
        CONNECTING("CONNECTING",0)      //      NEW
        ,OPEN("OPEN",1)                 //      CONNECTED
        ,CLOSING("CLOSING",2)           //      CLOSING
        ,CLOSED("CLOSED",3)             //      CLOSED
        String desc;
        Integer statusInt;
        WebSocketStates(String desc,Integer statusInt){
        public String getDescription() {
            return desc;
        public Integer getStatusInt() {
            return statusInt;
        public void printDescription(){
            System.out.println("PrinterStatus: "+desc);

    public ReconnectWebsocket(URI path) throws DeploymentException, IOException {
        this(new LogicWsExternal(){
            public void onOpen(Session session) { }},path, null);

    // Consturctor whith Only package visivility
    public ReconnectWebsocket(LogicWsExternal logicWsExernal, URI path) throws DeploymentException, IOException {
        this(logicWsExernal,path, null);

    public ReconnectWebsocket(LogicWsExternal logicWsExternal, URI path, ReconnectSettings settings) {

        // Default setting
        // Overwrite and define settings with options if they exist.

        /** Wheter this instance should log debug mesages. */
        this.debug = settings.getDebug()!=null ? settings.getDebug() : true;

        /** Wheter or not the websocket should attempt to connect immediately upon instantiation. */
        this.automaticOpen = settings.getAutomaticOpen() !=null ? settings.getAutomaticOpen() : true;

        /** The number of milliseconds to delay before attempting to reconnect. */
        this.reconnectInterval = settings.getReconnectInterval()!=null ? settings.getReconnectInterval() : 1000;

        /** The maximum number of milliseconds to delay a reconnection attempt. Timeout to reconnect */
        this.maxReconnectionInterval = settings.getMaxReconnectionInterval()!=null ? settings.getMaxReconnectionInterval() : 10000;

        /** The rate of increase of the reconnect delay. Allows reconnect attemps to back off when problems persist. */
        this.reconnectDecay = settings.getReconnectDecay()!=null ? settings.getReconnectDecay() : (float) 1.3;

        /** The maximum time in milliseconds to wait for a connection to succeed before closing and retrying */
        this.timeoutInterval = settings.getTimeoutInterval()!=null ? settings.getTimeoutInterval() : 5000;

        /** The number of connection attempts to make before to stop. Unlimited if value is zero.
        this.maxConnectAttempts = settings.getMaxConnectAttempts()!=null ? settings.getMaxConnectAttempts() : 0;

        /** The binary type, possible values 'blob' or 'arraybuffer', default 'blob'. */
        this.binaryType = settings.getBinaryType()!=null ? settings.getBinaryType() : "blob";

        //settings.put("idStateEvenbusChannel", "false");
        //settings.put("idStateEvenbusChannel", "false");

        // These should be treated as read-only properties

        /** The URL as resolved by the constructor. This is always an absolute URL. Reas only. */
        this.path = path;

        /** The number of attempted reconnects since starting, or the last successful connection. Read only. */
        this.connectAttemptsCount = 0;

        * The current state of the connection.
        * Can be one of: WebSocket.CONNECTING, WebSocket.OPEN, WebSocket.CLOSING, WebSocket.CLOSED
        * Read only.
        this.readyState = WebSocketStates.CLOSED;

         * A string indicating the name of the sub-protocol the server selected; this will be one of
         * the strings specified in the protocols parameter when creating the WebSocket object.
         * Read only.
        // TODO 

         // "has a" rather than "is a" observable
        //observable = new ReconnectObservable();

        this.logicExternal = logicWsExternal;

        wsLogic = new LogicWs();
        //client =  ClientManager.createClient();//GLiszli lient by default
        // Java 7 cient. 
        client = ClientManager.createClient(JdkClientContainer.class.getName());


        // By default initialize the executors.
        executor = Executors.newScheduledThreadPool(1);
        executor2 = Executors.newScheduledThreadPool(1);
        openFlag = new AtomicBoolean(true);

        // Wheher or not to create a websocket upon instantiation
        if (this.automaticOpen) {


    public void open() {

        if (readyState == WebSocketStates.CONNECTING || readyState == WebSocketStates.OPEN ) {

        if (executor.isShutdown()) {
            executor = Executors.newScheduledThreadPool(1);

        if (executor2.isShutdown()) {
             * Este poolthread se apaga cuando se manda a llamar la funcion close() de la API.
             * El apagado se realiza porque se considera que ya no se va o volver a conectar.
            executor2 = Executors.newScheduledThreadPool(1);

         * Resetear variables
        AtomicInteger counter = new AtomicInteger(0);
        connectAttemptsCount = 0;

        readyState = WebSocketStates.CONNECTING;

        // Ejecutar funciones en metodo OnConnecting
        //String reconnectReason = e.getMessage();
        //self.update(new InternalMessageWs(WsEventType.ONCONNECTING,new OnConnectingEvent(reconnectReason)));
        update(new InternalMessageWs(WsEventType.ONCONNECTING,new OnConnectingEvent("First Connect")));

        Runnable openRun = () -> {
                if (debug) {
                    System.out.println("DEBUG: ReconnectingWebSocket attempt-connect# "+(connectAttemptsCount+1)+" of "+(maxConnectAttempts==0?"infinite":maxConnectAttempts)+" URI="+path.getPath());

                Callable<Session> task1 = new Callable<Session>() {
                    public Session call() throws Exception {
                        // Avizar al API que se esta intentando realizar una reconexión. Patron productor-consumidor.
                        // TODO Deberia de ser un hilo?? ESto ya que podria se r que algo en el metodo externo sea de tipo bloqueante.

                        Session session = client.connectToServer(wsLogic,path);

                        //self.session = client.connectToServer(wsLogic,self.path);

                        //System.out.println("ReconnectWebsocket:: client.connectToServer(...) is null:"+(session==null?"true":"false"));
                        //System.out.println("ReconnectWebsocket:: client.connectToServer(...) is open:"+session.isOpen());

                        return session;
                Future<Session> future = executor.submit(task1);
                try {
                    // Tiempo de espera antes de interrumpir volver a intentarlo.
                    //Session s = future.get(self.timeoutInterval,TimeUnit.MILLISECONDS);

                    //Session s = future.get(30,TimeUnit.SECONDS);
                    //return s;
                    //self.session = future.get(30,TimeUnit.SECONDS);
                    session = future.get(timeoutInterval,TimeUnit.MILLISECONDS);

                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    // TODO Auto-generated catch block

                    //e.printStackTrace();          //For debug only

                    // Calculate Back off time.
                    float timeout = (float) (reconnectInterval * Math.pow(reconnectDecay,connectAttemptsCount));

                    if (maxConnectAttempts > 0 && ( connectAttemptsCount >= maxConnectAttempts )) {

                    int maxTimeReconnect = (int) (timeout > maxReconnectionInterval ? maxReconnectionInterval : timeout);

                    Callable<Boolean> timerReconnection = new Callable<Boolean>() {
                        public Boolean call() {

                            while(counter.get() >= 0) {
                                System.out.println("Time next reconection: "+counter.get()+" seconds");
                                System.out.println("ThreadId: "+Thread.currentThread().getId() );

                                // Avizar a la API el tiempo para la sig. reconexión.. Patron productor-consumidor.
                                // TODO Deberia de ser un hilo?? ESto ya que podria se r que algo en el metodo externo sea de tipo bloqueante.

                                if (counter.get() == 0 && debug ) {
                                    System.out.println("DEBUG: ReconnectingWebSocket connection-timeout: "+path.getPath());
                                try {
                                } catch (InterruptedException e) {
                                    // TODO Auto-generated catch block
                            return false;

                     *          scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS)
                     * 0 -> initialDelay,  this time is includen in the backoff algorithm calc, then this value is zero.
                     * timeout -> the delay between the termination of one execution and the commencement of the next
                    timerReconnectionFuture = executor.submit(timerReconnection);

                    try {
                        Boolean delayTime = timerReconnectionFuture.get();
                    } catch (InterruptedException | ExecutionException e1) {
                        // TODO Auto-generated catch block


            }while (!Thread.currentThread().isInterrupted()); //END while
            //while (openFlag.get()); //END while   

        };// END Runnable

        openFuture = executor2.submit(openRun);

    public void send(String str) {
        // Convertir el texto a objeto Message para enviarlo.

        if (readyState ==  WebSocketStates.OPEN) {
            if (debug) {
                LOG.debug("Sending to URL:\"{}\", Data:\n\"{}\" ",path.getPath(),JsonWriter.formatJson(str));
                //System.out.println("DEBUG: ReconnectingWebSocket sending to "+path.getPath()+": "+str);
            try {
            } catch (IOException e) {
                LOG.error("Sending to URL:\"{}\", Data:\"{}\" {}",path.getPath(),str,e);

     * @param j objeto Json a enviar por Websocket.
     * @throws EncodeException 
     * @throws IOException 
    public void send(Json j) {
        //System.out.println("ReconnectWebsocket:: send(Json INI)");
        send(new Message(j));
        //System.out.println("ReconnectWebsocket:: send(Json END)");

     * Transmits data to the server over Websocket connection.
     * @param data a text string, ArrayBuffer or Blob to send to the server.
     * @throws IOException 
     * @throws EncodeException 
    public void send(Message data) {
        //System.out.println("ReconnectWebsocket:: send(Message INI)");

        if (readyState ==  WebSocketStates.OPEN) {
            if (debug) {
                System.out.println("DEBUG: ReconnectingWebSocket send "+path.getPath()+": "+data);

            //System.out.println("ReconnectWebSocket::send(Message msg - Before)" );
            //System.out.println("ReconnectWebSocket::send(Message msg - Before - session is null="+(session==null?"true":"false" ));
            //System.out.println("ReconnectWebSocket::send(Message msg - Before - session is open="+session.isOpen());

            try {

                LOG.debug("Sending to URL:\"{}\", Data:\"{}\" ",path.getPath(),data);

                //System.out.println("REconnectWebSocket::send(Message msg -  After)" );
            } catch (IOException | EncodeException e) {
                LOG.error("Sending to URL:\"{}\", Data:\"{}\" {}",path.getPath(),data,e);
        // Deberia de detenerse la reconeccion en estos casos?, es decir detener despues de intentar enviar una
        //  cadena de texto pero que ha fallado.

        //System.out.println("ReconnectWebsocket:: send(Message END)");

    public void close(String reason) {

    public void close() {

    public void close(CloseReason.CloseCodes code) {

    /** Closes the Websocket connection or connection attempt, if any.
     *  If the connection is already CLOSED, this method does nothing.
    public void close(CloseReason.CloseCodes code, String reason) {

        if (readyState == WebSocketStates.CLOSED) {

        CloseReason closeReason;
        forcedClose = true;

         * Status code:     1000
         * Name:            CLOSE_NORMAL
         * Description:     The connection successfully completed whatever purpose for
         *                   which it was created.
        // Default CLOSE_NORMAL code
        if (code==null) {
            closeReason = new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"ReconnectingWebSocket STOP");
        } else if (reason!=null) {
            closeReason = new CloseReason(code,reason);
        } else {
            closeReason = new CloseReason(code,"ReconnectingWebSocket STOP");

        if ( (readyState == WebSocketStates.OPEN || readyState == WebSocketStates.CONNECTING) ) {

            // Change readyState status:
            readyState = WebSocketStates.CLOSED;

            if (session==null) {
                 * readyState == WebSocketStates.CONNECTING && session == null
                 * This ocurr when the server is off and the client is in a loop trying to connect.


            } else {
                 * readyState == WebSocketStates.OPEN && session != null
                 *  or
                 * readyState == WebSocketStates.CONNECTING && session != null

                try { // Permanent close. Called via the Close method.
                    if (session.isOpen()) {
                         * Session is previously closed when has connected at less one time, after the server shutdown
                         * and the reconnection beging. During the reconnection if you try to close (forced close)
                         * then session.close will thorwn a error.
                         * To fix we have verificate if the session is closed.


                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    LOG.error("Error cerrando sesion. {}",e);




     * Additional public API method to refresh the connection if still is open.
     * After close, the websocket will try to reconnect.
     * For example, if the app suspects bad data / missed heart beats, it can try to refresh.
    public void refresh() {
        if (readyState == WebSocketStates.OPEN) {
            try {
                session.close(new CloseReason(CloseCodes.SERVICE_RESTART, "refresh!!"));
            } catch (IOException e) {
                LOG.error("Error cerrando sesion. {}",e);

        } else {
            // Stop timer of reconnection.
            if (readyState == WebSocketStates.CONNECTING) {

                close(CloseCodes.SERVICE_RESTART, "refresh!!");

                // Reset variables.
                connectAttemptsCount = 0;




    public Session getSession(){
        return session;

    public WebSocketStates getReadyState() {
        return readyState;

     * El observador de los cambios en el clente Websocket interno.
    public void update(InternalMessageWs msg) {
        switch (msg.getType()) {
        case ONOPEN:
            // Cambiar estado de la conexión
            readyState = WebSocketStates.OPEN;

            if (debug ) {
                System.out.println("DEBUG: ReconnectingWebSocket onOpen: "+path.getPath());

            // Ejecutar las funciones onOPen que el usuario ha definido.
            logicExternal.onOpen( ((OnOpenEvent)msg.getEvent()).getSession() );
        case ONMESSAGE:

            if (debug) {
                System.out.println("DEBUG: ReconnectingWebSocket onMessage: "+path.getPath());

            OnMessageEvent evtMsg = (OnMessageEvent)msg.getEvent();

            // Ejecutar las funciones OnMessage que el usuario ha definido.
        case ONCLOSE:

            if (debug ) {
                System.out.println("DEBUG: ReconnectingWebSocket onClose: "+path.getPath()+" forcedClose="+forcedClose);

            // Cambiar estado de la conexión
            readyState = WebSocketStates.CLOSED;

            OnCloseEvent evtClose = (OnCloseEvent)msg.getEvent();
            // Ejecutar las funciones OnClose que el usuario ha definido.

             * Determinar si se debe vlver a conectar o no.
             * Si forcedClose = true, entonces detener.
             * Si forcedClose = false, entonces reconectar.
            if (!forcedClose) {
                if (debug ) {
                    System.out.println("DEBUG: ReconnectingWebSocket STOP Reconnectiing: "+path.getPath());

                forcedClose = false;
        case ONERROR:

            if (debug ) {
                System.out.println("DEBUG: ReconnectingWebSocket onError: "+path.getPath());

            // Cambiar estado de la conexión
            readyState = WebSocketStates.CLOSED;

            OnErrorEvent evtError = (OnErrorEvent)msg.getEvent();
            // Ejecutar las funciones OnError que el usuario ha definido.

            // Volver a iniciar secuencia de conectar.
            // Algunos prfieren cerrar la conexion.
        case ONCONNECTING:

            if (debug ) {
                System.out.println("DEBUG: ReconnectingWebSocket onConnecting: "+path.getPath());

            OnConnectingEvent evtConnecting = (OnConnectingEvent)msg.getEvent();
            // Ejecutar las funciones OnConnecting que el usuario ha definido.


    public void watcherReconnectionTry() {

    public void watcherTimeLeft(int timeLeft) {



public interface ReconnectObserver {
    public void update(InternalMessageWs msg);

    public void watcherReconnectionTry();
    public void watcherTimeLeft(int timeLeft);


ReconnectObservable class:

import java.util.ArrayList;

public class ReconnectObservable implements ReconnectSubject {
    private ArrayList<ReconnectObserver> observers;

    public ReconnectObservable() {
        observers = new ArrayList<ReconnectObserver>();

    public void addObserver(ReconnectObserver observer) {

    public void notifyObservers(InternalMessageWs msg) {
        for(ReconnectObserver observer : observers) {


ReconnectSubject 接口:

public interface ReconnectSubject {
    public void addObserver(ReconnectObserver observer);
    //public void notifyObservers();
    public void notifyObservers(InternalMessageWs msg);

InternalMessageWs class:

import javax.websocket.CloseReason;
import javax.websocket.Session;

public class InternalMessageWs {
    WsEventType type;
    Object event;
    InternalMessageWs(WsEventType type) {
        this.type = type;
        this.event = null;
    InternalMessageWs(WsEventType type, Object event) {
        this.type = type;
    public WsEventType getType() {
        return type;
    public void setType(WsEventType type) {
        this.type = type;
    public Object getEvent() {
        return event;
    public void setEvent(Object event) {
        this.event = event;

class OnOpenEvent {
    Session session;
    public OnOpenEvent(Session session) {
        this.session = session;
    public Session getSession() {
        return session;

    public void setSession(Session session) {
        this.session = session;
class OnMessageEvent {
    Message message;
    Session session;
    public OnMessageEvent(Session session, Message message) {
        this.message = message;
        this.session = session;
    public Message getMessage() {
        return message;
    public void setMessage(Message message) {
        this.message = message;
    public Session getSession() {
        return session;
    public void setSession(Session session) {
        this.session = session;
class OnCloseEvent {
    Session session;
    CloseReason reason;
    public OnCloseEvent(Session session, CloseReason reason) {
        this.session = session;
        this.reason = reason;
    public Session getSession() {
        return session;
    public void setSession(Session session) {
        this.session = session;
    public CloseReason getReason() {
        return reason;
    public void setReason(CloseReason reason) {
        this.reason = reason;
class OnErrorEvent {
    Session session;
    Throwable t;
    public OnErrorEvent(Session session,Throwable t) {
        this.t = t;
    public Throwable getT() {
        return t;
    public void setT(Throwable t) {
        this.t = t;
    public Session getSession() {
        return session;
    public void setSession(Session session) {
        this.session = session;
class OnConnectingEvent {
    String reason;

    public OnConnectingEvent(String reason) {
        this.reason = reason;

    public String getReason() {
        return reason;

    public void setReason(String reason) {
        this.reason = reason;
enum WsEventType {
    ONCONNECTING            // Using in Reconnecting state of the Websocket client.

您需要使用 Java 8 JDK 因为我使用可调用文件等