通过动态 TCP/IP 集成流发送 Java 对象时收到 java.lang.IllegalArgumentException?
Got a java.lang.IllegalArgumentException when sending a Java object by the dynamic TCP/IP integration flow?
Gary Russell 前段时间帮助我完成了以下 'DynamicTcpServer' 流程(参见 ),现在注入了一个消息服务,该服务在客户端连接后立即发送消息:
public class DynamicTcpServer implements TcpServer {
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, String> clients = new ConcurrentHashMap<>();
private final Map<String, TcpServerSpec> sockets;
private final MessageService messenger;
@Autowired
public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
this.messenger = messenger;
this.sockets = sockets;
}
@Override
public void start(String context) {
start(context, sockets.get(context).getPort());
}
@Override
public void start(String context, int port) {
if (this.registrations.containsKey(context)) {
/* already running */
}
else {
TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
this.registrations.put(context, flowContext.registration(flow).register());
}
}
@Override
public Set<String> running() {
return registrations.keySet();
}
@Override
public void stop(String context) {
IntegrationFlowRegistration registration = this.registrations.remove(context);
if (registration != null) {
registration.destroy();
}
}
@EventListener
public void connect(TcpConnectionOpenEvent event) {
String connectionId = event.getConnectionId();
this.clients.put(connectionId, event.getConnectionFactoryName());
}
@EventListener
public void closed(TcpConnectionCloseEvent event) {
this.clients.remove(event.getConnectionId());
}
@EventListener
public void listening(TcpConnectionServerListeningEvent event) {
}
@Scheduled(
fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
)
public void sender() {
this.clients.forEach((connectId, context) -> {
IntegrationFlowRegistration register = registrations.get(context);
if (register != null) {
try {
while (true) {
List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
msgs.stream().forEach(msg ->
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(IpHeaders.CONNECTION_ID, connectId).build()));
}
}
catch (NoMessageToSendException nm) {
appContext.getBean(context, TcpNetServerConnectionFactory.class)
.closeConnection(connectId);
}
}
});
}
}
消息服务returns要发送一个Java对象'com.harry.potter.entity.ServerMessage'。
所以我假设我必须在 '.serializer(TcpCodecs.lf())' 添加一些其他类型的转换器,因为我得到一个异常说:
2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1] c.l.c.c.cas.service.DynamicTcpServer : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1] c.l.c.c.c.service.DynamicTcpServer : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Error sending message
org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:125)
at com.harry.potter.service.DynamicTcpServer$$Lambda600/0x000000006f511b08.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:124)
at com.harry.potter.service.DynamicTcpServer$$Lambda552/0x000000003344f3b0.accept(Unknown Source)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
... 34 common frames omitted
我必须使用哪个转换器(序列化器)以及如何将它准确地插入我的 DynamicTcpServer?
编辑 1
要发送的消息服务messenger
returns一个Java对象'com.harry.potter.entity.ServerMessage'。 ServerMessage
包含一个保存消息长度的 int 字段和一个保存消息文本的 String 字段:
public class ServerMessage implements Serializable {
private static final long serialVersionUID = -1L;
private int len;
private String message;
/* getters & setters */
}
我正在尝试从编写 C 结构的 C/C++ 函数迁移
struct C_MSG
{
int len; /* Length field */
char text[MAX_MSG_LEN]; /* Data field */
} c_msg;
使用 C 套接字库 send
函数的消费者从给定内存地址向给定 TCP/IP 套接字写入给定字节数(文本长度 + 4)。
我正在寻找一个 Transformer 来为消息消费者准备相同的二进制内容。否则消费者将无法处理消息。
关注评论并查看 GenericTransformer<S, T>
转换可以在单个 Lambda 表达式中完成。转换的源将是 ServerMessage 的对象?结果应该是使用 Spring 的实用程序的字节数组:
.transform(s -> SerializationUtils.serialize(s))
Lambda 表达式会是那个吗?也许我需要一个自定义的 Transformer - 以更好地控制序列化过程,以防我的消费者期望英特尔响应。摩托罗拉字节顺序 - 实现特定接口?哪一个?也许有更简单的解决方案?
您需要先考虑 .transform()
,然后 .handle(Tcp.outboundAdapter(server));
将您的 ServerMessage
转换为 byte[]
或 String
。这就是 TcpMessageMapper
默认情况下所期望的。
当然,我可以建议您查看 Tcp.netServer()
的 mapper(TcpMessageMapper mapper)
选项及其 bytesMessageMapper
属性,但结果是一样的。
Gary Russell 前段时间帮助我完成了以下 'DynamicTcpServer' 流程(参见
public class DynamicTcpServer implements TcpServer {
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, String> clients = new ConcurrentHashMap<>();
private final Map<String, TcpServerSpec> sockets;
private final MessageService messenger;
@Autowired
public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
this.messenger = messenger;
this.sockets = sockets;
}
@Override
public void start(String context) {
start(context, sockets.get(context).getPort());
}
@Override
public void start(String context, int port) {
if (this.registrations.containsKey(context)) {
/* already running */
}
else {
TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
this.registrations.put(context, flowContext.registration(flow).register());
}
}
@Override
public Set<String> running() {
return registrations.keySet();
}
@Override
public void stop(String context) {
IntegrationFlowRegistration registration = this.registrations.remove(context);
if (registration != null) {
registration.destroy();
}
}
@EventListener
public void connect(TcpConnectionOpenEvent event) {
String connectionId = event.getConnectionId();
this.clients.put(connectionId, event.getConnectionFactoryName());
}
@EventListener
public void closed(TcpConnectionCloseEvent event) {
this.clients.remove(event.getConnectionId());
}
@EventListener
public void listening(TcpConnectionServerListeningEvent event) {
}
@Scheduled(
fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
)
public void sender() {
this.clients.forEach((connectId, context) -> {
IntegrationFlowRegistration register = registrations.get(context);
if (register != null) {
try {
while (true) {
List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
msgs.stream().forEach(msg ->
register.getMessagingTemplate().send(
MessageBuilder.withPayload(msg)
.setHeader(IpHeaders.CONNECTION_ID, connectId).build()));
}
}
catch (NoMessageToSendException nm) {
appContext.getBean(context, TcpNetServerConnectionFactory.class)
.closeConnection(connectId);
}
}
});
}
}
消息服务returns要发送一个Java对象'com.harry.potter.entity.ServerMessage'。 所以我假设我必须在 '.serializer(TcpCodecs.lf())' 添加一些其他类型的转换器,因为我得到一个异常说:
2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1] c.l.c.c.cas.service.DynamicTcpServer : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1] c.l.c.c.c.service.DynamicTcpServer : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Error sending message
org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:125)
at com.harry.potter.service.DynamicTcpServer$$Lambda600/0x000000006f511b08.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at com.harry.potter.service.DynamicTcpServer.lambda$sender(DynamicTcpServer.java:124)
at com.harry.potter.service.DynamicTcpServer$$Lambda552/0x000000003344f3b0.accept(Unknown Source)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
... 34 common frames omitted
我必须使用哪个转换器(序列化器)以及如何将它准确地插入我的 DynamicTcpServer?
编辑 1
要发送的消息服务messenger
returns一个Java对象'com.harry.potter.entity.ServerMessage'。 ServerMessage
包含一个保存消息长度的 int 字段和一个保存消息文本的 String 字段:
public class ServerMessage implements Serializable {
private static final long serialVersionUID = -1L;
private int len;
private String message;
/* getters & setters */
}
我正在尝试从编写 C 结构的 C/C++ 函数迁移
struct C_MSG
{
int len; /* Length field */
char text[MAX_MSG_LEN]; /* Data field */
} c_msg;
使用 C 套接字库 send
函数的消费者从给定内存地址向给定 TCP/IP 套接字写入给定字节数(文本长度 + 4)。
我正在寻找一个 Transformer 来为消息消费者准备相同的二进制内容。否则消费者将无法处理消息。
关注评论并查看 GenericTransformer<S, T>
转换可以在单个 Lambda 表达式中完成。转换的源将是 ServerMessage 的对象?结果应该是使用 Spring 的实用程序的字节数组:
.transform(s -> SerializationUtils.serialize(s))
Lambda 表达式会是那个吗?也许我需要一个自定义的 Transformer - 以更好地控制序列化过程,以防我的消费者期望英特尔响应。摩托罗拉字节顺序 - 实现特定接口?哪一个?也许有更简单的解决方案?
您需要先考虑 .transform()
,然后 .handle(Tcp.outboundAdapter(server));
将您的 ServerMessage
转换为 byte[]
或 String
。这就是 TcpMessageMapper
默认情况下所期望的。
当然,我可以建议您查看 Tcp.netServer()
的 mapper(TcpMessageMapper mapper)
选项及其 bytesMessageMapper
属性,但结果是一样的。