Spring Cloud Dataflow 类型转换在处理器组件中不起作用?
Spring Cloud Dataflow Type conversion not working in processor component?
我有一个处理器可以将 byte[]
有效载荷转换为 MyClass
有效载荷:
@Slf4j
@EnableBinding(Processor.class)
public class MyDecoder {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public MyClass decode(final byte[] payload) {
MyClass decoded = doStuff(payload);
if (decoded != null) {
log.info("Successfully decoded!");
}
return decoded;
}
}
我尝试创建以下 DSL:some-source | my-decoder | some-sink
和 some-sink
报告错误,因为它在 class 加载程序中没有 MyClass
class。这是预期的行为。
我尝试在 my-decoder
上应用类型转换,例如:some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink
,我在 my-decoder
日志中收到以下错误:
2017-01-20 21:45:17.278 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:18.441 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:20.512 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [B@7dfad000)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access[=12=]0(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access00(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.lang.IllegalArgumentException: payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
... 42 common frames omitted
我可以看到消息已从 byte[]
转换为 MyClass
并且不是空的。我不明白为什么我在失败之前看到了 3 次消息,因为 kafka 属性 'retries' 是 0,如 my-decoder
log on start:
所示
2017-01-20 21:44:32.080 INFO 9408 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
我尝试编写集成测试:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public abstract class MyDecoderTests {
@Autowired
protected Processor channels;
@Autowired
protected MessageCollector collector;
public static class UsingNothingIntegrationTests extends MyDecoderTest {
@Test
public void test() throws Exception {
channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class)));
}
}
@SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json")
public static class UsingOutputConverterIntegrationTests extends MyDecoderTest {
@Test
public void test() throws Exception {
channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}")));
}
}
@Configuration
@EnableAutoConfiguration
@Import(MyDecoderConfiguration.class)
public static class MyDecoderTestApplication {
}
}
测试运行成功,转换发生。
然后,我认为我的 DSL 不正确,所以我写了一个新的源代码来测试:
@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<MyClass> exampleSource() {
return () -> new GenericMessage<>(getMyClassObject());
}
下面的 DSL 按预期将 MyClass 转换为 JSON:my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink
为什么我收到关于解码的消息记录了 3 次,为什么失败并显示 'payload must not be null' 消息?是不是我的处理器有问题?
您看到 3 次尝试,因为这是活页夹中输入通道的默认重试配置。
活页夹中存在一个错误,如果转换器无法转换消息,它会尝试创建具有 null
有效负载的消息。
它无法转换有效负载的原因是因为它看到入站 content-type(大概是 application/octet-stream
)并且它无法将其转换为 JSON。
解决方法是将文件添加到类路径:
META-INF/spring.integration.properties
并添加
spring.integration.readOnly.headers=contentType
给它。
这可以防止入站内容类型 header 传播到出站消息。
这需要 spring 集成 4.3.2 或更高版本。
在 SCSt 的未来版本中,这将被默认设置。
我有一个处理器可以将 byte[]
有效载荷转换为 MyClass
有效载荷:
@Slf4j
@EnableBinding(Processor.class)
public class MyDecoder {
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public MyClass decode(final byte[] payload) {
MyClass decoded = doStuff(payload);
if (decoded != null) {
log.info("Successfully decoded!");
}
return decoded;
}
}
我尝试创建以下 DSL:some-source | my-decoder | some-sink
和 some-sink
报告错误,因为它在 class 加载程序中没有 MyClass
class。这是预期的行为。
我尝试在 my-decoder
上应用类型转换,例如:some-source | my-decoder --spring.cloud.stream.bindings.output.contentType=application/json | some-sink
,我在 my-decoder
日志中收到以下错误:
2017-01-20 21:45:17.278 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:18.441 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:20.512 INFO 9408 --- [afka-listener-2] com.example.MyDecoder : Successfully decoded!
2017-01-20 21:45:20.515 ERROR 9408 --- [afka-listener-2] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = example.some-source, partition = 0, offset = 1, key = null, value = [B@7dfad000)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access[=12=]0(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.5.RELEASE.jar!/:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access00(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:778) [spring-kafka-1.0.4.RELEASE.jar!/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.lang.IllegalArgumentException: payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:115) ~[spring-core-4.3.5.RELEASE.jar!/:4.3.5.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:194) ~[spring-cloud-stream-1.1.0.RELEASE.jar!/:1.1.0.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415) ~[spring-integration-core-4.3.6.RELEASE.jar!/:4.3.6.RELEASE]
... 42 common frames omitted
我可以看到消息已从 byte[]
转换为 MyClass
并且不是空的。我不明白为什么我在失败之前看到了 3 次消息,因为 kafka 属性 'retries' 是 0,如 my-decoder
log on start:
2017-01-20 21:44:32.080 INFO 9408 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
我尝试编写集成测试:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public abstract class MyDecoderTests {
@Autowired
protected Processor channels;
@Autowired
protected MessageCollector collector;
public static class UsingNothingIntegrationTests extends MyDecoderTest {
@Test
public void test() throws Exception {
channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
assertThat(collector.forChannel(channels.output()), receivesPayloadThat(instanceOf(MyClass.class)));
}
}
@SpringBootTest("spring.cloud.stream.bindings.output.contentType=application/json")
public static class UsingOutputConverterIntegrationTests extends MyDecoderTest {
@Test
public void test() throws Exception {
channels.input().send(new GenericMessage<Object>(Hex.decodeHex("ff".toCharArray())));
assertThat(collector.forChannel(channels.output()), receivesPayloadThat(is("{\"example\": true\"}")));
}
}
@Configuration
@EnableAutoConfiguration
@Import(MyDecoderConfiguration.class)
public static class MyDecoderTestApplication {
}
}
测试运行成功,转换发生。
然后,我认为我的 DSL 不正确,所以我写了一个新的源代码来测试:
@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<MyClass> exampleSource() {
return () -> new GenericMessage<>(getMyClassObject());
}
下面的 DSL 按预期将 MyClass 转换为 JSON:my-source --spring.cloud.stream.bindings.output.contentType=application/json | some-sink
为什么我收到关于解码的消息记录了 3 次,为什么失败并显示 'payload must not be null' 消息?是不是我的处理器有问题?
您看到 3 次尝试,因为这是活页夹中输入通道的默认重试配置。
活页夹中存在一个错误,如果转换器无法转换消息,它会尝试创建具有 null
有效负载的消息。
它无法转换有效负载的原因是因为它看到入站 content-type(大概是 application/octet-stream
)并且它无法将其转换为 JSON。
解决方法是将文件添加到类路径:
META-INF/spring.integration.properties
并添加
spring.integration.readOnly.headers=contentType
给它。
这可以防止入站内容类型 header 传播到出站消息。
这需要 spring 集成 4.3.2 或更高版本。
在 SCSt 的未来版本中,这将被默认设置。