httpd 代理连接超时背后的 TomEE websocket

TomEE websocket behind an httpd proxy connection timeout

在开发中,我有一个 javascript 直接连接到 TomEE 的 websocket,并且 websocket 保持连接没有问题。

在 httpd 代理后面使用 TomEE 的生产环境中,连接在大约 30 秒后超时。

这是虚拟主机配置的相关部分

ProxyPass / ajp://127.0.0.1:8009/ secret=xxxxxxxxxxxx
RewriteEngine on
RewriteCond %{HTTP:Upgrade} websocket [NC]
RewriteCond %{HTTP:Connection} upgrade [NC]
RewriteRule ^/?(.*) "ws://127.0.0.1:8080/" [P,L]

我已经尝试使用重新连接的网络套接字 npm 库,但它似乎一直在生成网络套接字,直到 chrome 内存不足。原始的 websockets 保持状态 101 而不是更改为完成。

我确实读到过防火墙会导致它断开连接,但我搜索了 firewalld 和 websocket 但找不到任何东西

看起来答案是实现“ping pong”。这可以防止防火墙或代理终止连接。

如果您对 websocket(客户端或服务器)执行 ping 操作,则规范说明它必须响应 (pong)。但是 Javascript websocket 取决于浏览器的实现,因此最好在服务器上对所有客户端执行 30 秒的 ping。例如

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/websockets/admin/autoreply")
public class MyWebSocket {
    private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
    private static final Set<String> alive = Collections.synchronizedSet(new HashSet<String>());

    @OnOpen
    public void onOpen(Session session) throws IOException {
        sessions.add(session);
        alive.add(session.getId());
    }

    @OnMessage
    public void onMessage(Session session, String string) throws IOException {
//       broadcast(string);
    }

    @OnMessage
    public void onPong(Session session, PongMessage pongMessage) throws IOException {
//      System.out.println("pong");
        alive.add(session.getId());
    }

    @OnClose
    public void onClose(Session session) throws IOException {
        sessions.remove(session);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        // Do error handling here
    }

    public void broadcast(String string) {
        synchronized (sessions) {
            for (Session session : sessions) {
                broadcast(session, string);
            }
        }
    }

    private void broadcast(Session session, String string) {
        try {
            session.getBasicRemote().sendText(string);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    public void ping() {
        synchronized (sessions) {
            for (Session session : sessions) {
                ping(session);
            }
        }
    }

    private void ping(Session session) {
        try {
            synchronized (alive) {
                if (alive.contains(session.getId())) {
                    String data = "Ping";
                    ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
                    session.getBasicRemote().sendPing(payload);
                    alive.remove(session.getId());
                } else {
                    session.close();
                }
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

定时器服务看起来像这样

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.ScheduleExpression;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;

import org.apache.tomcat.websocket.server.DefaultServerEndpointConfigurator;

import tld.domain.api.websockets.MyWebSocket;

@Singleton
@Lock(LockType.READ)
@Startup
public class HeartbeatTimer {

    @Resource
    private TimerService timerService;

    @PostConstruct
    private void construct() {
        final TimerConfig heartbeat = new TimerConfig("heartbeat", false);
        timerService.createCalendarTimer(new ScheduleExpression().second("*/30").minute("*").hour("*"), heartbeat);
    }

    @Timeout
    public void timeout(Timer timer) {
        if ("heartbeat".equals(timer.getInfo())) {
//          System.out.println("Pinging...");
            try {
                DefaultServerEndpointConfigurator dsec = new DefaultServerEndpointConfigurator();
                MyWebSocket ws = dsec.getEndpointInstance(MyWebSocket.class);
                ws.ping();
            } catch (InstantiationException e) {
                e.printStackTrace();
            }           
        }
    }
}