Spring 云函数和死信交换
Spring Cloud Function and Dead Letter Exchange
我正在尝试使用 RabbitMQ 编写一个反应式 Spring Cloud Function 服务,它将消耗一个队列并生成一个交换。
我有 2 个问题。
- 为什么我在日志中收到以下错误。
- 如何使用 doOnError 进行拒绝? doOnError 只能访问可抛出对象,而不能访问执行拒绝的消息。
这是应用程序代码。它是从这个问题Spring Reactive Stream - Unexpected Shutdown
中复制的
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.runStreamApplication args);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> transform() {
return inbound -> inbound.map(msg -> {
try {
System.out.println("ACKING MESSAGE");
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
channel.basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
return msg;
});
}
}
这里是application.yml。它有 2 个不同的绑定器用于从队列传入和传出到交换。
spring:
cloud:
stream:
override-cloud-connectors: true
rabbit:
bindings:
events-processor:
producer:
bindQueue: false
declareExchange: false
routing-key-expression: headers.eventType
events:
consumer:
acknowledge-mode: MANUAL
prefetch: 10
#auto-bind-dlq: false
dead-letter-exchange: dead-letter-exchange
bindQueue: false
declareExchange: false
function:
definition: transform
bindings:
transform-in-0: events
transform-out-0: events-processor
bindings:
events:
destination: queue
binder: consumerrabbit
group: events-processor
events-processor:
destination: activity-events
binder: producerrabbit
binders:
producerrabbit:
defaultCandidate: false
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: host
port: port
username: username
password: password
virtual-host: virtual-host
consumerrabbit:
defaultCandidate: true
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: host
port: port
username: username
password: password
virtual-host: virtual-host
这是启动日志和应用程序收到事件时的日志。我不确定为什么在启动期间频道有订阅者,但一收到消息就说没有订阅者。
2020-09-07 12:02:04.076 INFO 10652 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events' has 1 subscriber(s).
2020-09-07 18:02:08.848 INFO 10652 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events-processor' has 1 subscriber(s).
2020-09-07 12:02:05.081 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel events
2020-09-07 12:02:05.145 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2020-09-07 12:02:05.178 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel events-processor
...................other startup log messages...............
2020-09-07 18:05:09.896 INFO 10652 --- [nts-processor-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events' has 0 subscriber(s).
ACKING MESSAGE
2020-09-07 18:05:12.900 ERROR 10652 --- [nts-processor-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.events'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={
"data": {
}
}, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedExchange=source, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=source.events-processor, amqp_channel=Cached Rabbit Channel: AMQChannel(amqp:), conn: Proxy@43e869ea Shared Rabbit Connection: SimpleConnection@213451a4 [delegate=amqp:], amqp_redelivered=false, id=35cdfa7f-7f75-5502-aede-6ec9569145f0, amqp_consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, sourceData=(Body:'[B@20dc391b(byte[401])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=source, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, consumerQueue=source.events-processor]), contentType=application/json, timestamp=1599501912897}], failedMessage=GenericMessage [payload={
"data": {
}
}, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedExchange=source, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=source.events-processor, amqp_channel=Cached Rabbit Channel: AMQChannel(amqp://H), conn: Proxy@43e869ea Shared Rabbit Connection: SimpleConnection@213451a4 [delegate=amqp://], amqp_redelivered=false, id=35cdfa7f-7f75-5502-aede-6ec9569145f0, amqp_consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, sourceData=(Body:'[B@20dc391b(byte[401])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=source, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, consumerQueue=source.events-processor]), contentType=application/json, timestamp=1599501912897}]
弄清楚了如何让函数在失败时向 DLQ 发送消息。我也添加了一个消费者,因为它们是相关的。
我认为我们需要确认或拒绝该消息,但在拒绝时我们想要 return a Flux.empty() 这样就不会向下游交换发布任何内容。
代码拒绝任何以失败为负载的消息,并确认任何其他消息。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> transform() {
return inbound -> inbound.flatMap(msg -> {
try {
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
if (msg.getPayload().startsWith("fail")) {
channel.basicReject(deliveryTag, false);
} else {
channel.basicAck(deliveryTag, false);
return Flux.just(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
return Flux.empty();
});
}
@Bean
public Consumer<Flux<Message<String>>> consumer() {
return inbound -> inbound
.map(msg -> {
try {
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
if (msg.getPayload().startsWith("fail")) {
channel.basicReject(deliveryTag, false);
} else {
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
e.printStackTrace();
}
return msg.getPayload();
})
.subscribe(System.out::println);
}
}
spring:
profiles: consumer
cloud:
stream:
function:
definition: consumer
bindings:
consumer-in-0:
destination: consumer
group: queue
rabbit:
bindings:
consumer-in-0:
consumer:
acknowledge-mode: MANUAL
autoBindDlq: true
declareDlx: true
bindQueue: true
deadLetterExchange: dead-letter-exchange
prefetch: 10
---
spring:
profiles: processor
cloud:
stream:
function:
definition: transform
bindings:
transform-in-0:
destination: processorIn
group: queue
transform-out-0:
destination: processorOut
rabbit:
bindings:
transform-in-0:
consumer:
acknowledge-mode: MANUAL
autoBindDlq: true
declareDlx: true
bindQueue: true
deadLetterExchange: dead-letter-exchange
prefetch: 10
transform-out-0:
producer:
bindQueue: true
declareExchange: true
我正在尝试使用 RabbitMQ 编写一个反应式 Spring Cloud Function 服务,它将消耗一个队列并生成一个交换。
我有 2 个问题。
- 为什么我在日志中收到以下错误。
- 如何使用 doOnError 进行拒绝? doOnError 只能访问可抛出对象,而不能访问执行拒绝的消息。
这是应用程序代码。它是从这个问题Spring Reactive Stream - Unexpected Shutdown
中复制的@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.runStreamApplication args);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> transform() {
return inbound -> inbound.map(msg -> {
try {
System.out.println("ACKING MESSAGE");
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
channel.basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
catch (IOException e) {
e.printStackTrace();
}
return msg;
});
}
}
这里是application.yml。它有 2 个不同的绑定器用于从队列传入和传出到交换。
spring:
cloud:
stream:
override-cloud-connectors: true
rabbit:
bindings:
events-processor:
producer:
bindQueue: false
declareExchange: false
routing-key-expression: headers.eventType
events:
consumer:
acknowledge-mode: MANUAL
prefetch: 10
#auto-bind-dlq: false
dead-letter-exchange: dead-letter-exchange
bindQueue: false
declareExchange: false
function:
definition: transform
bindings:
transform-in-0: events
transform-out-0: events-processor
bindings:
events:
destination: queue
binder: consumerrabbit
group: events-processor
events-processor:
destination: activity-events
binder: producerrabbit
binders:
producerrabbit:
defaultCandidate: false
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: host
port: port
username: username
password: password
virtual-host: virtual-host
consumerrabbit:
defaultCandidate: true
inheritEnvironment: false
type: rabbit
environment:
spring:
rabbitmq:
host: host
port: port
username: username
password: password
virtual-host: virtual-host
这是启动日志和应用程序收到事件时的日志。我不确定为什么在启动期间频道有订阅者,但一收到消息就说没有订阅者。
2020-09-07 12:02:04.076 INFO 10652 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events' has 1 subscriber(s).
2020-09-07 18:02:08.848 INFO 10652 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events-processor' has 1 subscriber(s).
2020-09-07 12:02:05.081 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel events
2020-09-07 12:02:05.145 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2020-09-07 12:02:05.178 INFO 10652 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel events-processor
...................other startup log messages...............
2020-09-07 18:05:09.896 INFO 10652 --- [nts-processor-1] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.events' has 0 subscriber(s).
ACKING MESSAGE
2020-09-07 18:05:12.900 ERROR 10652 --- [nts-processor-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.events'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload={
"data": {
}
}, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedExchange=source, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=source.events-processor, amqp_channel=Cached Rabbit Channel: AMQChannel(amqp:), conn: Proxy@43e869ea Shared Rabbit Connection: SimpleConnection@213451a4 [delegate=amqp:], amqp_redelivered=false, id=35cdfa7f-7f75-5502-aede-6ec9569145f0, amqp_consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, sourceData=(Body:'[B@20dc391b(byte[401])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=source, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, consumerQueue=source.events-processor]), contentType=application/json, timestamp=1599501912897}], failedMessage=GenericMessage [payload={
"data": {
}
}, headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedExchange=source, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=source.events-processor, amqp_channel=Cached Rabbit Channel: AMQChannel(amqp://H), conn: Proxy@43e869ea Shared Rabbit Connection: SimpleConnection@213451a4 [delegate=amqp://], amqp_redelivered=false, id=35cdfa7f-7f75-5502-aede-6ec9569145f0, amqp_consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, sourceData=(Body:'[B@20dc391b(byte[401])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=source, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-Z87p6nKN4PLJKDbZQpXnVw, consumerQueue=source.events-processor]), contentType=application/json, timestamp=1599501912897}]
弄清楚了如何让函数在失败时向 DLQ 发送消息。我也添加了一个消费者,因为它们是相关的。
我认为我们需要确认或拒绝该消息,但在拒绝时我们想要 return a Flux.empty() 这样就不会向下游交换发布任何内容。
代码拒绝任何以失败为负载的消息,并确认任何其他消息。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> transform() {
return inbound -> inbound.flatMap(msg -> {
try {
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
if (msg.getPayload().startsWith("fail")) {
channel.basicReject(deliveryTag, false);
} else {
channel.basicAck(deliveryTag, false);
return Flux.just(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
return Flux.empty();
});
}
@Bean
public Consumer<Flux<Message<String>>> consumer() {
return inbound -> inbound
.map(msg -> {
try {
Channel channel = msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
if (msg.getPayload().startsWith("fail")) {
channel.basicReject(deliveryTag, false);
} else {
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
e.printStackTrace();
}
return msg.getPayload();
})
.subscribe(System.out::println);
}
}
spring:
profiles: consumer
cloud:
stream:
function:
definition: consumer
bindings:
consumer-in-0:
destination: consumer
group: queue
rabbit:
bindings:
consumer-in-0:
consumer:
acknowledge-mode: MANUAL
autoBindDlq: true
declareDlx: true
bindQueue: true
deadLetterExchange: dead-letter-exchange
prefetch: 10
---
spring:
profiles: processor
cloud:
stream:
function:
definition: transform
bindings:
transform-in-0:
destination: processorIn
group: queue
transform-out-0:
destination: processorOut
rabbit:
bindings:
transform-in-0:
consumer:
acknowledge-mode: MANUAL
autoBindDlq: true
declareDlx: true
bindQueue: true
deadLetterExchange: dead-letter-exchange
prefetch: 10
transform-out-0:
producer:
bindQueue: true
declareExchange: true