通过动态 TCP/IP 集成流发送 Java 对象时收到 java.lang.IllegalArgumentException?

Got a java.lang.IllegalArgumentException when sending a Java object by the dynamic TCP/IP integration flow?

Gary​​ Russell 前段时间帮助我完成了以下 'DynamicTcpServer' 流程(参见 ),现在注入了一个消息服务,该服务在客户端连接后立即发送消息:

public class DynamicTcpServer implements TcpServer {

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, String> clients = new ConcurrentHashMap<>();

    private final Map<String, TcpServerSpec> sockets;

    private final MessageService messenger;

    @Autowired
    public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
        this.messenger = messenger;
        this.sockets = sockets;
    }

    @Override
    public void start(String context) {
        start(context, sockets.get(context).getPort());
    }

    @Override
    public void start(String context, int port) {
        if (this.registrations.containsKey(context)) {
            /* already running */
        }
        else {
            TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
            server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
            IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
            this.registrations.put(context, flowContext.registration(flow).register());
        }
    }

    @Override
    public Set<String> running() {
        return registrations.keySet();
    }

    @Override
    public void stop(String context) {
        IntegrationFlowRegistration registration = this.registrations.remove(context);
        if (registration != null) {
            registration.destroy();
        }
    }

    @EventListener
    public void connect(TcpConnectionOpenEvent event) {
        String connectionId = event.getConnectionId();
        this.clients.put(connectionId, event.getConnectionFactoryName());
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        this.clients.remove(event.getConnectionId());
    }

    @EventListener
    public void listening(TcpConnectionServerListeningEvent event) {
    }

    @Scheduled(
            fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
            initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
    )
    public void sender() {
        this.clients.forEach((connectId, context) -> {
            IntegrationFlowRegistration register = registrations.get(context);
            if (register != null) {
                try {
                    while (true) {
                        List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
                        msgs.stream().forEach(msg -> 
                                register.getMessagingTemplate().send(
                                        MessageBuilder.withPayload(msg)
                                                .setHeader(IpHeaders.CONNECTION_ID, connectId).build()));       
                    }
                }
                catch (NoMessageToSendException nm) {
                    appContext.getBean(context, TcpNetServerConnectionFactory.class)
                            .closeConnection(connectId);    
                }
            }
        });
    }
}

消息服务returns要发送一个Java对象'com.harry.potter.entity.ServerMessage'。 所以我假设我必须在 '.serializer(TcpCodecs.lf())' 添加一些其他类型的转换器,因为我得到一个异常说:

2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.cas.service.DynamicTcpServer     : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.c.service.DynamicTcpServer     : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1]  o.s.i.ip.tcp.TcpSendingMessageHandler    : Error sending message

org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:125)
    at com.harry.potter.service.DynamicTcpServer$$Lambda600/0x000000006f511b08.accept(Unknown Source)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:124)
    at com.harry.potter.service.DynamicTcpServer$$Lambda552/0x000000003344f3b0.accept(Unknown Source)
    at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
    at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
    ... 34 common frames omitted

我必须使用哪个转换器(序列化器)以及如何将它准确地插入我的 DynamicTcpServer?

编辑 1

要发送的消息服务messengerreturns一个Java对象'com.harry.potter.entity.ServerMessage'。 ServerMessage 包含一个保存消息长度的 int 字段和一个保存消息文本的 String 字段:

public class ServerMessage implements Serializable {
    private static final long serialVersionUID = -1L;

    private int len;
    private String message;

    /* getters & setters */
}

我正在尝试从编写 C 结构的 C/C++ 函数迁移

struct C_MSG
{
    int len;                        /* Length field */
    char text[MAX_MSG_LEN];         /* Data field   */
} c_msg;

使用 C 套接字库 send 函数的消费者从给定内存地址向给定 TCP/IP 套接字写入给定字节数(文本长度 + 4)。

我正在寻找一个 Transformer 来为消息消费者准备相同的二进制内容。否则消费者将无法处理消息。

关注评论并查看 GenericTransformer<S, T> 转换可以在单个 Lambda 表达式中完成。转换的源将是 ServerMessage 的对象?结果应该是使用 Spring 的实用程序的字节数组:

.transform(s -> SerializationUtils.serialize(s))

Lambda 表达式会是那个吗?也许我需要一个自定义的 Transformer - 以更好地控制序列化过程,以防我的消费者期望英特尔响应。摩托罗拉字节顺序 - 实现特定接口?哪一个?也许有更简单的解决方案?

您需要先考虑 .transform(),然后 .handle(Tcp.outboundAdapter(server)); 将您的 ServerMessage 转换为 byte[]String。这就是 TcpMessageMapper 默认情况下所期望的。

当然,我可以建议您查看 Tcp.netServer()mapper(TcpMessageMapper mapper) 选项及其 bytesMessageMapper 属性,但结果是一样的。