Citrus 示例 TCP 发送和接收失败

Citrus Example TCP send and receive fails

我正在尝试通过 Citrus-Framework 发送和接收 TCP 消息,我在其中使用了这个线程作为参考:

我正在使用 python 消息中继器,它 returns 接收消息。我收到了带有 python 和 returns 的有效负载,但 citrus 超时了。我尝试了所有序列化程序(SingleTerminatior 除外,会导致上下文错误)。

我尝试了不同的序列化程序,但 none 似乎解决了我的问题,Citrus 总是超时。

15:03:58,013 DEBUG t.TestContextFactory| Created new test context - using global variables: '{}'
15:03:58,024 INFO  port.LoggingReporter| 
15:03:58,024 INFO  port.LoggingReporter|------------------------------------------------------------------------
15:03:58,024 DEBUG port.LoggingReporter| STARTING TEST sendSpringIntegrationMessageTest <com.consol.citrus.samples>
15:03:58,025 INFO  port.LoggingReporter| 
15:03:58,025 DEBUG      citrus.TestCase| Initializing test case
15:03:58,026 DEBUG  context.TestContext| Setting variable: citrus.test.name with value: 'sendSpringIntegrationMessageTest'
15:03:58,027 DEBUG  context.TestContext| Setting variable: citrus.test.package with value: 'com.consol.citrus.samples'
15:03:58,028 DEBUG      citrus.TestCase| Test variables:
15:03:58,028 DEBUG      citrus.TestCase| citrus.test.name = sendSpringIntegrationMessageTest
15:03:58,028 DEBUG      citrus.TestCase| citrus.test.package = com.consol.citrus.samples
15:03:58,029 INFO  port.LoggingReporter| 
15:03:58,030 DEBUG port.LoggingReporter| TEST STEP 1/2: send
15:03:58,049 DEBUG nnel.ChannelProducer| Sending message to channel: 'input'
15:03:58,055 DEBUG nnel.ChannelProducer| Message to send is:
DEFAULTMESSAGE [id: c5c61991-f567-42bd-9302-1f8e1fa16225, payload: Req][headers: {citrus_message_type=XML, citrus_message_id=c5c61991-f567-42bd-9302-1f8e1fa16225, citrus_message_timestamp=1539090238031}]
15:03:58,164 INFO  nnel.ChannelProducer| Message was sent to channel: 'input'
15:03:58,165 INFO  port.LoggingReporter| 
15:03:58,166 DEBUG port.LoggingReporter| TEST STEP 1/2 SUCCESS
15:03:58,166 INFO  port.LoggingReporter| 
15:03:58,166 DEBUG port.LoggingReporter| TEST STEP 2/2: receive
15:03:58,168 DEBUG nnel.ChannelConsumer| Receiving message from: replies
15:04:03,171 INFO  port.LoggingReporter| 
15:04:03,172 ERROR port.LoggingReporter| TEST FAILED sendSpringIntegrationMessageTest <com.consol.citrus.samples> Nested exception is:
at com.consol.citrus.exceptions.ActionTimeoutException: Action timeout while receiving message from channel 'replies'
at com.consol.citrus.channel.ChannelConsumer.receive(ChannelConsumer.java:97)
at com.consol.citrus.messaging.AbstractSelectiveMessageConsumer.receive(AbstractSelectiveMessageConsumer.java:50)
at com.consol.citrus.actions.ReceiveMessageAction.receive(ReceiveMessageAction.java:141)
at com.consol.citrus.actions.ReceiveMessageAction.doExecute(ReceiveMessageAction.java:120)
at com.consol.citrus.actions.AbstractTestAction.execute(AbstractTestAction.java:46)
at com.consol.citrus.dsl.actions.DelegatingTestAction.doExecute(DelegatingTestAction.java:54)
at com.consol.citrus.actions.AbstractTestAction.execute(AbstractTestAction.java:46)
at com.consol.citrus.TestCase.executeAction(TestCase.java:234)
at com.consol.citrus.TestCase.doExecute(TestCase.java:153)
at com.consol.citrus.actions.AbstractTestAction.execute(AbstractTestAction.java:46)
at com.consol.citrus.Citrus.run(Citrus.java:403)
at com.consol.citrus.dsl.testng.TestNGCitrusTest.invokeTestMethod(TestNGCitrusTest.java:125)
at com.consol.citrus.dsl.testng.TestNGCitrusTestDesigner.invokeTestMethod(TestNGCitrusTestDesigner.java:73)
at com.consol.citrus.dsl.testng.TestNGCitrusTest.run(TestNGCitrusTest.java:110)
...

我的上下文似乎是对的(我使用的是spring-integration-ip 5.0.8-RELEASE),执行测试时没有异常(使用SingleTerminatior除外):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       ... >

   <citrus:channel-endpoint id="citrusServiceInputEndpoint"
                        channel-name="input" />

   <citrus:channel-endpoint id="citrusServiceRepliesEndpoint"
                        channel-name="replies" />

   <int-ip:tcp-connection-factory id="client"
                              type="client" host="127.0.0.1"
                              port="33500" single-use="false"
                              so-timeout="10000" using-nio="true"
                              deserializer="javaSerializer"
                              serializer="javaSerializer" />

   <bean id="javaSerializer"
       class="org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer" />

    <int:channel id="input" />

    <int:channel id="replies">
     <int:queue />
    </int:channel>

    <int-ip:tcp-outbound-channel-adapter
       id="outboundClient" channel="input" connection-factory="client" />

    <int-ip:tcp-inbound-channel-adapter
       id="inboundClient" channel="replies" connection-factory="client" />

</beans>

感谢任何形式的帮助

谢谢

因为我是 spring 的新手,这里是我添加的依赖项:

 <dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-ip</artifactId>
  <version>5.0.8.RELEASE</version>
 </dependency>

这是 citrus 的调试输出:

22:47:41,071 DEBUG port.LoggingReporter| TEST STEP 1/2: send
22:47:41,085 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceInputEndpoint'
22:47:41,086 DEBUG nnel.ChannelProducer| Sending message to channel: 'input'
22:47:41,086 DEBUG nnel.ChannelProducer| Message to send is:
DEFAULTMESSAGE [id: 7d4f4c7a-92d0-462c-b695-c32fc7e697ae, payload: Req][headers: {citrus_message_type=XML, citrus_message_id=7d4f4c7a-92d0-462c-b695-c32fc7e697ae, citrus_message_timestamp=1539118061073}]
22:47:41,087 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'input'
22:47:41,091 DEBUG hannel.DirectChannel| preSend on channel 'input', message: 
GenericMessage [payload=Req, headers={citrus_message_timestamp=1539118061073, citrus_message_type=XML, id=33295f63-948f-6bcf-289f-d5e1df8dc98b, citrus_message_id=7d4f4c7a-92d0-462c-b695-c32fc7e697ae, timestamp=1539118061091}]
22:47:41,092 DEBUG endingMessageHandler| org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0 received message:
GenericMessage [payload=Req, headers={citrus_message_timestamp=1539118061073, citrus_message_type=XML, id=33295f63-948f-6bcf-289f-d5e1df8dc98b, citrus_message_id=7d4f4c7a-92d0-462c-b695-c32fc7e697ae, timestamp=1539118061091}]
22:47:41,092 DEBUG entConnectionFactory| Opening new socket connection to 127.0.0.1:33500
22:47:41,106 DEBUG ion.TcpNioConnection| New connection localhost:33500:55108:33be3b24-f5ae-4594-83e9-c7eb0f104b1f
22:47:41,110 DEBUG entConnectionFactory| client: Added new connection: localhost:33500:55108:33be3b24-f5ae-4594-83e9-c7eb0f104b1f
22:47:41,113 DEBUG endingMessageHandler| Got Connection localhost:33500:55108:33be3b24-f5ae-4594-83e9-c7eb0f104b1f
22:47:41,114 DEBUG ion.TcpNioConnection| localhost:33500:55108:33be3b24-f5ae-4594-83e9-c7eb0f104b1f writing 4
22:47:41,116 DEBUG ion.TcpNioConnection| localhost:33500:55108:33be3b24-f5ae-4594-83e9-c7eb0f104b1f Message sent GenericMessage [payload=Req, headers={citrus_message_timestamp=1539118061073, citrus_message_type=XML, id=33295f63-948f-6bcf-289f-d5e1df8dc98b, citrus_message_id=7d4f4c7a-92d0-462c-b695-c32fc7e697ae, timestamp=1539118061091}]
22:47:41,117 DEBUG channel.DirectChannel| postSend (sent=true) on channel 'input', message: GenericMessage [payload=Req, headers={citrus_message_timestamp=1539118061073, citrus_message_type=XML, id=33295f63-948f-6bcf-289f-d5e1df8dc98b, citrus_message_id=7d4f4c7a-92d0-462c-b695-c32fc7e697ae, timestamp=1539118061091}]
22:47:41,118 INFO  nnel.ChannelProducer| Message was sent to channel: 'input'
22:47:41,118 INFO  port.LoggingReporter| 
22:47:41,119 DEBUG port.LoggingReporter| TEST STEP 1/2 SUCCESS
22:47:41,119 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceRepliesEndpoint'
22:47:41,120 INFO  port.LoggingReporter| 
22:47:41,121 DEBUG port.LoggingReporter| TEST STEP 2/2: receive
22:47:41,122 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceRepliesEndpoint'
22:47:41,124 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'response'
22:47:41,125 DEBUG nnel.ChannelConsumer| Receiving message from: response
22:47:46,130 INFO  port.LoggingReporter| 
22:47:46,131 ERROR port.LoggingReporter| TEST FAILED
sendSpringIntegrationMessageTest <com.consol.citrus.samples> Nested exception is: 
com.consol.citrus.exceptions.ActionTimeoutException: Action timeout while receiving message from channel 'response'
    ...

我检查了 Wireshark 并返回了负载 ("Req"),包括换行符,可以在屏幕截图中看到。也许我应该提到我在 Ubuntu 虚拟机中 运行。

Wireshark screenshot

@CitrusTest(name = "sendSpringIntegrationMessageTest")
public void sendSpringIntegrationMessageTest() throws Exception {
    send("citrusServiceInputEndpoint").payload("Req");
    receive("citrusServiceRepliesEndpoint").payload("Req");
}

编辑

好的,我的 python 服务器似乎有问题。我稍微更改了代码以在控制台上查看接收到的字节(在我计算字节之前),突然 citrus 的输出发生了变化。

11:40:54,924 DEBUG port.LoggingReporter| TEST STEP 2/2: receive
11:40:54,925 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceRepliesEndpoint'
11:40:54,925 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'response'
11:40:54,925 DEBUG nnel.ChannelConsumer| Receiving message from: response
11:40:54,928 DEBUG ion.TcpNioConnection| localhost:33500:55890:7537f5ed-f447-4b76-ba66-2ec59c7619a9 Reading...
11:40:54,929 DEBUG ion.TcpNioConnection| localhost:33500:55890:7537f5ed-f447-4b76-ba66-2ec59c7619a9 Running an assembler
11:40:54,929 DEBUG ion.TcpNioConnection| Read 4 into raw buffer
Exception in thread "pool-1-thread-3" java.lang.AbstractMethodError: org.springframework.integration.ip.tcp.connection.TcpMessageMapper.toMessage(Ljava/lang/Object;)Lorg/springframework/messaging/Message;
    at org.springframework.integration.ip.tcp.connection.TcpNioConnection.convert(TcpNioConnection.java:358)
    at org.springframework.integration.ip.tcp.connection.TcpNioConnection.run(TcpNioConnection.java:235)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
11:40:54,937 DEBUG ion.TcpNioConnection| localhost:33500:55890:7537f5ed-f447-4b76-ba66-2ec59c7619a9 Reading...
11:40:54,938 DEBUG ion.TcpNioConnection| Read 0 into raw buffer
11:40:59,930 INFO  port.LoggingReporter| 
11:40:59,931 ERROR port.LoggingReporter| TEST FAILED sendSpringIntegrationMessageTest <com.consol.citrus.samples> Nested exception is: 
com.consol.citrus.exceptions.ActionTimeoutException: Action timeout while receiving message from channel 'response'

EDIT2

由于以下 post: How to use Spring Integration 5 with Spring Boot 1.5.x 我从 spring-integration-ip 5.0.8 切换到 4.3.9,输出又改变了一次。现在我的问题似乎从 TCP 问题转移到了实际的 spring 技术诀窍。

15:04:12,318 DEBUG port.LoggingReporter| TEST STEP 2/2: receive
15:04:12,319 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceRepliesEndpoint'
15:04:12,319 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'replies'
15:04:12,319 DEBUG nnel.ChannelConsumer| Receiving message from: replies
15:04:12,327 DEBUG ion.TcpNioConnection| localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195 Reading...
15:04:12,328 DEBUG ion.TcpNioConnection| localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195 Running an assembler
15:04:12,329 DEBUG ion.TcpNioConnection| Read 4 into raw buffer
15:04:12,329 DEBUG ion.TcpNioConnection| localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195 Reading...
15:04:12,330 DEBUG ion.TcpNioConnection| Read 0 into raw buffer
15:04:12,332 DEBUG yteArrayLfSerializer| Available to read:4
15:04:12,333 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'messageBuilderFactory'
15:04:12,336 DEBUG channel.QueueChannel| preSend on channel 'replies', message: GenericMessage [payload=byte[3], headers={ip_tcp_remotePort=33500, ip_connectionId=localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195, ip_localInetAddress=0.0.0.0/0.0.0.0, ip_address=127.0.0.1, id=9bd90a54-cc2b-d3d0-ca63-6355f53dee7c, ip_hostname=localhost, timestamp=1539176652336}]
15:04:12,339 DEBUG channel.QueueChannel| postReceive on channel 'replies', message: GenericMessage [payload=byte[3], headers={ip_tcp_remotePort=33500, ip_connectionId=localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195, ip_localInetAddress=0.0.0.0/0.0.0.0, ip_address=127.0.0.1, id=9bd90a54-cc2b-d3d0-ca63-6355f53dee7c, ip_hostname=localhost, timestamp=1539176652336}]
15:04:12,340 DEBUG nnel.ChannelConsumer| Received message from: replies
15:04:12,340 DEBUG tListableBeanFactory| Returning cached instance of singleton bean 'citrusServiceRepliesEndpoint'
15:04:12,354 DEBUG channel.QueueChannel| postSend (sent=true) on channel 'replies', message: GenericMessage [payload=byte[3], headers={ip_tcp_remotePort=33500, ip_connectionId=localhost:33500:56004:1174f4c6-608c-44c6-aeec-33da4074e195, ip_localInetAddress=0.0.0.0/0.0.0.0, ip_address=127.0.0.1, id=9bd90a54-cc2b-d3d0-ca63-6355f53dee7c, ip_hostname=localhost, timestamp=1539176652336}]
15:04:12,379 INFO  port.LoggingReporter| 
15:04:12,381 ERROR port.LoggingReporter| TEST FAILED sendSpringIntegrationMessageTest <com.consol.citrus.samples> Nested exception is: 
com.consol.citrus.exceptions.CitrusRuntimeException: Could not find proper message validator for message type 'XML', please define a capable message validator for this message type

EDIT3

似乎在为测试调整了我的环境之后,我终于在我的上下文文件中遗漏了这一行:

<int:object-to-string-transformer id="transformer" input-channel="replies" output-channel="response" />

感谢您将我推向正确的方向。