Spring-websocket 将命令结果流式传输回浏览器

Spring-websocket to stream results of a command back to browser

我正在尝试将 ping 命令的结果流式传输回浏览器...我真的很希望用户可以看到命令 运行 而不是只看到最终结果.这是我所拥有的(它是 Groovy 顺便说一句,这并不重要):)

@Controller
class IntranetWebsocketController {

    @MessageMapping("/ping")
    @SendToUser(destinations = "/topic/ping", broadcast = false)
    static ResponseEntity (Map<String, String> address) {
        def builder = new ProcessBuilder("/bin/ping", "-c", "10", "-s", "1400", "-W", "200", "-i", "0.2", "-D", "-O", address.get("ip"))
        builder.redirectErrorStream(true)
        def process = builder.start()
        ResponseEntity.ok().body(new InputStreamResource(process.inputStream))
    }
}

但后来我明白了:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class java.lang.UNIXProcess$ProcessPipeInputStream and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.http.ResponseEntity["body"]->org.springframework.core.io.InputStreamResource["inputStream"])
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:275) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.SerializerProvider.mappingException(SerializerProvider.java:1109) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.SerializerProvider.reportMappingProblem(SerializerProvider.java:1134) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:69) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:32) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:693) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:690) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:693) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:690) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:292) ~[jackson-databind-2.8.1.jar:2.8.1]
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2484) ~[jackson-databind-2.8.1.jar:2.8.1]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:240) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    ... 17 common frames omitted

关于如何解决这个问题的任何想法?显然这是一个映射问题,但我不知道如何处理这个问题。有没有办法告诉映射器将所有内容都作为字符串发送?

[编辑 1]

根据 benjamin.d 所说的,我将代码稍微更改为以下内容:

@Controller
class IntranetWebsocketController {
    @Autowired
    SimpMessagingTemplate template

    @MessageMapping("/ping")
    void pingSend (Map<String, String> ipaddress, Principal principal) {
        def builder = new ProcessBuilder("/bin/ping", "-c", "10", "-s", "1400", "-W", "200", "-i", "0.2", "-D", "-O", ipaddress.get("ip"))
        builder.redirectErrorStream(true)
        def process = builder.start()

        def inputReader = new InputStreamReader(process.inputStream)
        def bufferedReader = new BufferedReader(inputReader)

        def line
        while ((line = bufferedReader.readLine()) != null) {
            template.convertAndSendToUser(principal.name, "/topic/ping", line)
        }
    }
}

现在唯一的问题是,如果我打开了多个选项卡来监听这个 websocket,它们都会在接收到数据时做出反应,因为我删除了@SendToUser 注释。但那是另一回事。

[更新] 对于那些对此主题感兴趣的人,这是我最终做的事情:

@Service
class ThreadedPinger {
    @Async
    Future<Process> run(SimpMessagingTemplate template, Principal principal, String hash, String ip) {

        def builder = new ProcessBuilder("/usr/bin/sudo", "/bin/ping", "-c", "500", "-s", "1400", "-W", "200", "-i", "0.001", "-D", "-O", ip)
        builder.redirectErrorStream(true)

        def process = builder.start()
        def bufferedReader = new BufferedReader(new InputStreamReader(process.inputStream))

        def line
        while ((line = bufferedReader.readLine()) != null) {
            if (Thread.interrupted()) {
                process.destroyForcibly()
                break
            }
            template.convertAndSendToUser(principal.name, "/topic/ping", read)
        }
        bufferedReader.close()
        new AsyncResult<Process>(process)
    }
}


@Controller
class IntranetWebsocketController {
    @Autowired
    SimpMessagingTemplate template
    @Autowired
    ThreadedPinger threadedPinger

    @MessageMapping("/ping")
    void startPing(Map<String, String> request, Principal principal, SimpMessageHeaderAccessor headerAccessor) {
        def pinger = threadedPinger.run(template, principal, request.get("hash"), request.get("ip"))
        headerAccessor.sessionAttributes.put("pinger", pinger)
    }

    @MessageMapping("/pingclose")
    static void closePing(SimpMessageHeaderAccessor headerAccessor) {
        def pinger = (Future) headerAccessor.sessionAttributes.get("pinger")
        pinger.cancel(true)
    }
}

我添加了“/pingclose”消息映射,以便浏览器可以终止服务器端的线程。这样做有点麻烦,因为我无法在请求之间引用 "pinger" 变量,所以我将它添加到会话中。

Jackson 将寻找用于序列化的 getter 和带注释的方法。在你的情况下,它找不到任何东西,默认情况下它会因空 bean 而失败。

您可以使用以下方法禁用此功能:

mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

然而,此代码仍然无法工作,因为 Jackson 不会意识到新数据可用于序列化。相反,您将不得不启动一个线程来读取进程输入流(使用 InputStream 上的 read 方法)。读取方法将阻止执行,直到有新数据可用。 从 InputStream.read() 获取数据后,将其推送回您的 websocket 客户端。