在 Spring 集成中读取 Aws Kinesis Streams 时全局处理特定于通道的错误
Handle error globally and specific to channel while reading Aws Kinesis Streams in Spring Intehration
我正在收听 aws kinesis 流并在全局和特定于通道启用错误处理,如下所示。
应用程序 Yml
server:
port: 8090
eureka.client.enabled: false
spring:
cloud:
stream:
bindings:
MyInboundChannel:
group: myGroup
destination: awsstream
content-type: application/json
errorChannelEnabled: true
errors:
destination: myGrouperrorChannel
cloud:
aws:
region:
static: us-east-1
credentials:
accessKey: accessKey
secretKey: secretKey
kinesis:
endpoint: endpoint
我已经为全局错误通道编写了如下处理程序
错误处理程序
public class ErrorHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void errorChannel(Throwable message) {
log.error("error has been reported " + message);
}
}
我的频道特定错误频道的配置
@Bean(name = "myGrouperrorChannel")
public MessageChannel myGrouperrorChannel() {
return new DirectChannel();
}
消息监听器和通道错误监听器
@EnableBinding(Binder.class)
public class MessageListener {
@StreamListener("MyInboundChannel")
public void receiveMessage(String message) {
System.out.println("MyInboundChannel" + message);
//throw exception explicitly
throw new RuntimeException("Iris error");
}
@StreamListener("myGrouperrorChannel")
public void myGrouperrorChannel(Message message) {
log.error("myGrouperrorChannel has been reported " + message);
}
}
活页夹接口
public interface Binder {
@Input("MyInboundChannel")
SubscribableChannel itemMessage();
}
错误既不由全局错误通道处理,也不由我的通道特定错误通道处理
我是不是漏了什么。
异常日志
Exception in thread "-kinesis-consumer-1" org.springframework.messaging.MessagingException: Exception thrown while invoking com.package.MessageListener #receiveMessage[1 args]; nested exception is java.lang.RuntimeException: my error, failedMessage=GenericMessage [payload={hello
}, headers={aws_partitionKey=partitionKey-0, aws_shard=shardId-000000000000, aws_sequenceNumber=99579810409965332842672442827229165777883733894461652994, id=12d9f4a8-b5d2-2252-647f-b28bad087f5c, contentType=application/json, aws_stream=awsstream, timestamp=1519149049433}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:82)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:912)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.access00(KinesisMessageDrivenChannelAdapter.java:688)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.run(KinesisMessageDrivenChannelAdapter.java:822)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1003)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: my error
at com.package.MessageListener.receiveMessage(MessageListener.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 30 more
谁能帮忙
更新
我尝试使用以下代码但无法捕获错误。我尝试了 Stream Listener 和服务激活器。如果我做错了什么,你能帮帮我吗?
应用程序 Yml
server.port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: awsstream.myprocessor
content-type: application/json
producer:
partitionKeyExpression: "1"
input:
group: grp
destination: awsstream.mysource
content-type: application/json
errorChannelEnabled: true
input2:
group: grp2
destination: awsstream.mysink
content-type: application/json
Class
@EnableBinding(Processor.class)
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@StreamListener(Processor.INPUT)
public void transform(String message) {
if ("junk".equals(message)) {
System.out.println("processor"+message);
}
else {
throw new IllegalStateException("Invalid payload: " + message);
}
}
@Bean(name = Processor.INPUT + "." + "grp" + ".errors")
public SubscribableChannel consumerErrorChannel2() {
return new PublishSubscribeChannel();
}
@ServiceActivator(inputChannel="errorChannel")
public void processMessage(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener("errorChannel")
public void processMessagea(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener("errorChannel")
public void processMessagea(Message message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@ServiceActivator(inputChannel="errorChannel")
public void processMessage2(Message message) {
System.out.println("Errror has been reported message"+message);
}
@ServiceActivator(inputChannel=Processor.INPUT + "." + "grp" + ".errors")
public void processMessage22(Message message) {
System.out.println("Errror has been reported message"+message);
}
@ServiceActivator(inputChannel=Processor.INPUT + "." + "grp" + ".errors")
public void processMessageg(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener(Processor.INPUT + "." + "grp" + ".errors")
public void processMessageaddd(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener(Processor.INPUT + "." + "grp" + ".errors")
public void processMessageaas(Message message) {
System.out.println("Errror has been reported errorMessage"+message);
}
}
并且不会在任何地方发现错误
Exception in thread "-kinesis-consumer-1" org.springframework.messaging.MessagingException: Exception thrown while invoking com.company.cloud.MyApplication#transform[1 args]; nested exception is java.lang.IllegalStateException: Invalid payload: aasa, failedMessage=GenericMessage [payload=aasa, headers={aws_partitionKey=partitionKey-0, aws_shard=shardId-000000000000, aws_sequenceNumber=49579810409608520919496497856488469248195805892912873474, id=2595d37f-82e1-b6bd-beed-3a9a3bb7243e, contentType=application/json, aws_stream=awsstream.mysource, timestamp=1520087287991}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:82)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:912)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.access00(KinesisMessageDrivenChannelAdapter.java:688)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.run(KinesisMessageDrivenChannelAdapter.java:822)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1003)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Invalid payload: aasa
at com.company.cloud.MyApplication.transform(MyApplication.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
您知道它按设计工作。只是问题不是很明显。
我刚刚将集成测试推送到 Kinesis Binder 项目。
您可以在 KinesisBinderProcessorTests
中找到它。
但是大体思路是这样的:
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = "spring.cloud.stream.bindings.input.group = " + KinesisBinderProcessorTests.CONSUMER_GROUP)
@DirtiesContext
public class KinesisBinderProcessorTests {
static final String CONSUMER_GROUP = "testGroup";
...
@EnableBinding(Processor.class)
public static class ProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(Message<String> message) {
String payload = message.getPayload();
if (!"junk".equals(payload)) {
return payload.toUpperCase();
}
else {
throw new IllegalStateException("Invalid payload: " + payload);
}
}
@Bean(name = Processor.INPUT + "." + CONSUMER_GROUP + ".errors")
public SubscribableChannel consumerErrorChannel() {
return new PublishSubscribeChannel();
}
注意 group
属性 以及如何使用它来声明 consumerErrorChannel
。
那么,您需要用 @StreamListener("MyInboundChannel.myGroup.errors")
代替 @StreamListener("myGrouperrorChannel")
。在 @StreamListener
上使用 destination
定义是错误的。
我还检查了来自消费者的错误被发送到全局的测试 errorChannel
。不确定你那里发生了什么......
我正在收听 aws kinesis 流并在全局和特定于通道启用错误处理,如下所示。
应用程序 Yml
server:
port: 8090
eureka.client.enabled: false
spring:
cloud:
stream:
bindings:
MyInboundChannel:
group: myGroup
destination: awsstream
content-type: application/json
errorChannelEnabled: true
errors:
destination: myGrouperrorChannel
cloud:
aws:
region:
static: us-east-1
credentials:
accessKey: accessKey
secretKey: secretKey
kinesis:
endpoint: endpoint
我已经为全局错误通道编写了如下处理程序
错误处理程序
public class ErrorHandler {
@ServiceActivator(inputChannel = "errorChannel")
public void errorChannel(Throwable message) {
log.error("error has been reported " + message);
}
}
我的频道特定错误频道的配置
@Bean(name = "myGrouperrorChannel")
public MessageChannel myGrouperrorChannel() {
return new DirectChannel();
}
消息监听器和通道错误监听器
@EnableBinding(Binder.class)
public class MessageListener {
@StreamListener("MyInboundChannel")
public void receiveMessage(String message) {
System.out.println("MyInboundChannel" + message);
//throw exception explicitly
throw new RuntimeException("Iris error");
}
@StreamListener("myGrouperrorChannel")
public void myGrouperrorChannel(Message message) {
log.error("myGrouperrorChannel has been reported " + message);
}
}
活页夹接口
public interface Binder {
@Input("MyInboundChannel")
SubscribableChannel itemMessage();
}
错误既不由全局错误通道处理,也不由我的通道特定错误通道处理
我是不是漏了什么。
异常日志
Exception in thread "-kinesis-consumer-1" org.springframework.messaging.MessagingException: Exception thrown while invoking com.package.MessageListener #receiveMessage[1 args]; nested exception is java.lang.RuntimeException: my error, failedMessage=GenericMessage [payload={hello
}, headers={aws_partitionKey=partitionKey-0, aws_shard=shardId-000000000000, aws_sequenceNumber=99579810409965332842672442827229165777883733894461652994, id=12d9f4a8-b5d2-2252-647f-b28bad087f5c, contentType=application/json, aws_stream=awsstream, timestamp=1519149049433}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:82)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:912)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.access00(KinesisMessageDrivenChannelAdapter.java:688)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.run(KinesisMessageDrivenChannelAdapter.java:822)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1003)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: my error
at com.package.MessageListener.receiveMessage(MessageListener.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 30 more
谁能帮忙
更新 我尝试使用以下代码但无法捕获错误。我尝试了 Stream Listener 和服务激活器。如果我做错了什么,你能帮帮我吗?
应用程序 Yml
server.port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: awsstream.myprocessor
content-type: application/json
producer:
partitionKeyExpression: "1"
input:
group: grp
destination: awsstream.mysource
content-type: application/json
errorChannelEnabled: true
input2:
group: grp2
destination: awsstream.mysink
content-type: application/json
Class
@EnableBinding(Processor.class)
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@StreamListener(Processor.INPUT)
public void transform(String message) {
if ("junk".equals(message)) {
System.out.println("processor"+message);
}
else {
throw new IllegalStateException("Invalid payload: " + message);
}
}
@Bean(name = Processor.INPUT + "." + "grp" + ".errors")
public SubscribableChannel consumerErrorChannel2() {
return new PublishSubscribeChannel();
}
@ServiceActivator(inputChannel="errorChannel")
public void processMessage(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener("errorChannel")
public void processMessagea(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener("errorChannel")
public void processMessagea(Message message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@ServiceActivator(inputChannel="errorChannel")
public void processMessage2(Message message) {
System.out.println("Errror has been reported message"+message);
}
@ServiceActivator(inputChannel=Processor.INPUT + "." + "grp" + ".errors")
public void processMessage22(Message message) {
System.out.println("Errror has been reported message"+message);
}
@ServiceActivator(inputChannel=Processor.INPUT + "." + "grp" + ".errors")
public void processMessageg(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener(Processor.INPUT + "." + "grp" + ".errors")
public void processMessageaddd(ErrorMessage message) {
System.out.println("Errror has been reported errorMessage"+message);
}
@StreamListener(Processor.INPUT + "." + "grp" + ".errors")
public void processMessageaas(Message message) {
System.out.println("Errror has been reported errorMessage"+message);
}
}
并且不会在任何地方发现错误
Exception in thread "-kinesis-consumer-1" org.springframework.messaging.MessagingException: Exception thrown while invoking com.company.cloud.MyApplication#transform[1 args]; nested exception is java.lang.IllegalStateException: Invalid payload: aasa, failedMessage=GenericMessage [payload=aasa, headers={aws_partitionKey=partitionKey-0, aws_shard=shardId-000000000000, aws_sequenceNumber=49579810409608520919496497856488469248195805892912873474, id=2595d37f-82e1-b6bd-beed-3a9a3bb7243e, contentType=application/json, aws_stream=awsstream.mysource, timestamp=1520087287991}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access00(KinesisMessageDrivenChannelAdapter.java:82)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:912)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.access00(KinesisMessageDrivenChannelAdapter.java:688)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.run(KinesisMessageDrivenChannelAdapter.java:822)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1003)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Invalid payload: aasa
at com.company.cloud.MyApplication.transform(MyApplication.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
您知道它按设计工作。只是问题不是很明显。
我刚刚将集成测试推送到 Kinesis Binder 项目。
您可以在 KinesisBinderProcessorTests
中找到它。
但是大体思路是这样的:
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = "spring.cloud.stream.bindings.input.group = " + KinesisBinderProcessorTests.CONSUMER_GROUP)
@DirtiesContext
public class KinesisBinderProcessorTests {
static final String CONSUMER_GROUP = "testGroup";
...
@EnableBinding(Processor.class)
public static class ProcessorConfiguration {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(Message<String> message) {
String payload = message.getPayload();
if (!"junk".equals(payload)) {
return payload.toUpperCase();
}
else {
throw new IllegalStateException("Invalid payload: " + payload);
}
}
@Bean(name = Processor.INPUT + "." + CONSUMER_GROUP + ".errors")
public SubscribableChannel consumerErrorChannel() {
return new PublishSubscribeChannel();
}
注意 group
属性 以及如何使用它来声明 consumerErrorChannel
。
那么,您需要用 @StreamListener("MyInboundChannel.myGroup.errors")
代替 @StreamListener("myGrouperrorChannel")
。在 @StreamListener
上使用 destination
定义是错误的。
我还检查了来自消费者的错误被发送到全局的测试 errorChannel
。不确定你那里发生了什么......