立即取消 运行 Java websocket 传输

Cancel a running Java websocket transmission without delay

问题描述有点长,先问我的问题:

当通过websocket发送二进制数据时,如何立即取消运行传输?

背景:

我有一个作为 GIS(地理信息系统)工作的 JavaScript 客户端。它的外观和感觉类似于 Google 地图,具有地图 window 用户可以通过鼠标拖动和缩放来导航。 例如,如果用户移动地图,新坐标将通过 websocket 发送到远程 Java 进程。 Java 进程现在创建一个新的地图图像并将其发送给客户端。在图像构建期间,它还会发送未完成的中间图像,因此客户端不必等待太久。 如果客户端现在快速连续多次移动地图,则新查询可能会到达 Java 后端,而前一个查询仍在处理中。先前的过程现在会将过时的图像发送给客户端。因此,如果查询到达 Java 后端,则必须中止来自该客户端的所有先前查询的处理,并丢弃其结果。

我必须确保两件事。如果有新查询到达,

尤其是后一个给我带来了问题。我目前的解决方案是这样的:

import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;

import javax.imageio.ImageIO;
import javax.servlet.http.HttpSession;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/asyncImage")
public class AsynchronousImageWebSocket {

    private Map<Session, Future<?>> futures = new HashMap<>();

    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        
        // Start the image generating process, 
        // passing the current websocket session as client ID.
        startImageGeneratingProcess(session);
    }

    // Called by the image generating process to send intermediate or final images to 
    // the client.
    // It also passes the websocket session that has been passed to startImageGeneratingProcess.
    public void sendImage(BufferedImage img, Session session) {
        if (futures.containsKey(session)) {
            if (!futures.get(session).isDone()) {
                
                // If a Future is currently stored for that session and it's not done yet,
                // this means that there already is a running image generating process from
                // a previous query. Cancel it.
                futures.get(session).cancel(true);
                logger.info("SEND cancelled");
            }
            futures.remove(session);
        }
        try (ByteArrayOutputStream out = new ByteArrayOutputStream();) {
            ImageIO.write(img, "PNG", out);

            // Send the image, store the returned Future and associate it with the 
            // client's session.
            Future<?> f = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(out.toByteArray()));
            futures.put(session, f);
        } catch (IOException e) {
            logger.error(e);
        } catch (Exception e) {
            logger.error(e);
        }
    }
}

不幸的是,Future 的取消方法似乎没有被评估。 运行 sendBinary 方法完成,即使我在它的 Future 上调用取消。有没有办法立即取消旧处理作业的 运行 sendBinary 方法?

感谢您的意见,如果您还需要什么,请告诉我。

p.s。另一个想法是简单地继续发送所有内容,并以某种方式让客户端识别和整理已弃用的图像。但是,生成和发送已弃用的图像会消耗大量资源,我想节省这些资源。

对不起,那完全是我的错。 websocket 没有阻塞。是不同地方的同步导致发送同步执行。

如果你在没有任何同步代码的情况下执行我上面描述的 sendImage 方法,它应该不会阻塞地执行。相反,如果 session.getAsyncRemote().sendBinary 同时被两个线程并发执行,它们都会抛出 IllegalStateException 并中止发送。在这种情况下,可以捕获该异常,丢弃它并简单地重新发送最后一张图像。

基于此,我按以下方式更改了我的 sendImage 方法:

public void temporaryImageReady(BufferedImage img, Session session) {
    if (futures.containsKey(session)) {
        if (!futures.get(session).isDone()) {
            futures.get(session).cancel(true);
            logger.info("SEND cancelled");
        }
        futures.remove(session);
    }

    try {
        send(img, handle);
    } catch (IOException e) {
        logger.error(e);
    } catch (IllegalStateException e) {
        logger.info("Image send collision, resend last image.");
        try {
            send(img, handle);
        } catch (Exception e1) {
            logger.info("Image resend after collision failed.");
            logger.error(e);
        }
    } catch (Exception e) {
        logger.error(e);
    }
}
    
private void send(BufferedImage img) throws Exception {
    try (ByteArrayOutputStream out = new ByteArrayOutputStream();) {
        ImageIO.write(img, "PNG", out);
        Future<?> f = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(out.toByteArray()));
        futures.put(session, f);
    }
}