具有自动配置的属性不适用于 spring 云流和 rabbitmq
Properties with auto configure not working on spring cloud stream and rabbitmq
我有一个带有属性的配置服务器和一个作为消费者的微服务。
我尝试配置 maxAttempts 以避免重试消费者微服务,但它似乎不起作用。
我还在配置服务器上定义了绑定属性,它们工作正常。我的消费者正在收听和接收消息,但它尝试了 3 次 然后崩溃了。
这是我的 application.yml 在我的配置服务器中
server:
servlet:
contextPath: /cmsrsssitemap/v1
spring:
cloud:
stream:
bindings:
sitemap-main-output:
destination: sitemap-main
group: cms-microservices-v1
content-type: application/json
#consumer.concurrency: 2
test-job-output:
destination: test-job
group: cms-microservices-v1
content-type: application/json
rabbit:
bindings:
test-job-output:
consumer:
maxAttempts: 1
requeueRejected: false
autoBindDlq: true
#dlqTtl: 5000
#requeueRejected: false
#dlqDeadLetterExchange: dltexchange1
#republishToDlq: true
这是制作方的application.yml
server.servlet.contextPath: /cmsjmshandler/v1
spring:
cloud:
stream:
bindings:
sitemap-main-input:
destination: sitemap-main
content-type: application/json
test-job-input:
destination: test-job
group: cms-microservices-v1
content-type: application/json
这是监听器。它抛出一个 NullPointer 用于测试目的
@Component
public class TestJobListener {
@StreamListener(StreamProcessor.TEST_JOB)
public void testJobInput(@Payload String input) throws InterruptedException {
// Thread.sleep(3000);
System.out.println("########################### "+new Date() + " Mensaje Recibido");
throw new NullPointerException();
}
}
StreamProcesor.java
public interface StreamProcessor {
public static final String TEST_JOB = "test-job";
public static final String SITEMAP_MAIN = "sitemap-main";
@Input(StreamProcessor.TEST_JOB)
SubscribableChannel testJobOutputInput();
@Input(StreamProcessor.SITEMAP_MAIN)
SubscribableChannel processSitemapMain();
}
这样做的目的是将失败的消息移至 DLQ,但它也不起作用
编辑 1:无法正常工作。我已经根据 Artem Bilan 进行了更改,但它也不起作用。
server:
servlet:
contextPath: /cmsrsssitemap/v1
spring:
cloud:
stream:
bindings:
test-job-output:
destination: test-job
group: cms-microservices-v1
content-type: application/json
consumer:
maxAttempts: 1
rabbit:
bindings:
test-job-output:
consumer:
requeueRejected: false
maxAttempts
不是 rabbit
属性。是核心。
spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
问题是我在 StreamProcesor 上输入了错误的名称
@StreamListener(StreamProcessor.TEST_JOB)
StreamProcesor.TEST_JOB 应该是频道名称,也不是目的地。更新我的问题。
更正 SteamProcesor.java
StreamProcesor.java
public interface StreamProcessor {
public static final String TEST_JOB = "test-job-output";
public static final String SITEMAP_MAIN = "sitemap-main";
@Input(StreamProcessor.TEST_JOB)
SubscribableChannel testJobOutputInput();
@Input(StreamProcessor.SITEMAP_MAIN)
SubscribableChannel processSitemapMain();
}
我刚刚对其进行了测试,它可以很好地配合我的这个(已更正的)配置。如果您在客户端启用 actuator/env
端点并且您可以看到属性:
(我使用了 input
和基于本地文件的配置服务器)。
我有一个带有属性的配置服务器和一个作为消费者的微服务。
我尝试配置 maxAttempts 以避免重试消费者微服务,但它似乎不起作用。
我还在配置服务器上定义了绑定属性,它们工作正常。我的消费者正在收听和接收消息,但它尝试了 3 次 然后崩溃了。
这是我的 application.yml 在我的配置服务器中
server:
servlet:
contextPath: /cmsrsssitemap/v1
spring:
cloud:
stream:
bindings:
sitemap-main-output:
destination: sitemap-main
group: cms-microservices-v1
content-type: application/json
#consumer.concurrency: 2
test-job-output:
destination: test-job
group: cms-microservices-v1
content-type: application/json
rabbit:
bindings:
test-job-output:
consumer:
maxAttempts: 1
requeueRejected: false
autoBindDlq: true
#dlqTtl: 5000
#requeueRejected: false
#dlqDeadLetterExchange: dltexchange1
#republishToDlq: true
这是制作方的application.yml
server.servlet.contextPath: /cmsjmshandler/v1
spring:
cloud:
stream:
bindings:
sitemap-main-input:
destination: sitemap-main
content-type: application/json
test-job-input:
destination: test-job
group: cms-microservices-v1
content-type: application/json
这是监听器。它抛出一个 NullPointer 用于测试目的
@Component
public class TestJobListener {
@StreamListener(StreamProcessor.TEST_JOB)
public void testJobInput(@Payload String input) throws InterruptedException {
// Thread.sleep(3000);
System.out.println("########################### "+new Date() + " Mensaje Recibido");
throw new NullPointerException();
}
}
StreamProcesor.java
public interface StreamProcessor {
public static final String TEST_JOB = "test-job";
public static final String SITEMAP_MAIN = "sitemap-main";
@Input(StreamProcessor.TEST_JOB)
SubscribableChannel testJobOutputInput();
@Input(StreamProcessor.SITEMAP_MAIN)
SubscribableChannel processSitemapMain();
}
这样做的目的是将失败的消息移至 DLQ,但它也不起作用
编辑 1:无法正常工作。我已经根据 Artem Bilan 进行了更改,但它也不起作用。
server:
servlet:
contextPath: /cmsrsssitemap/v1
spring:
cloud:
stream:
bindings:
test-job-output:
destination: test-job
group: cms-microservices-v1
content-type: application/json
consumer:
maxAttempts: 1
rabbit:
bindings:
test-job-output:
consumer:
requeueRejected: false
maxAttempts
不是 rabbit
属性。是核心。
spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
问题是我在 StreamProcesor 上输入了错误的名称
@StreamListener(StreamProcessor.TEST_JOB)
StreamProcesor.TEST_JOB 应该是频道名称,也不是目的地。更新我的问题。
更正 SteamProcesor.java StreamProcesor.java
public interface StreamProcessor {
public static final String TEST_JOB = "test-job-output";
public static final String SITEMAP_MAIN = "sitemap-main";
@Input(StreamProcessor.TEST_JOB)
SubscribableChannel testJobOutputInput();
@Input(StreamProcessor.SITEMAP_MAIN)
SubscribableChannel processSitemapMain();
}
我刚刚对其进行了测试,它可以很好地配合我的这个(已更正的)配置。如果您在客户端启用 actuator/env
端点并且您可以看到属性:
(我使用了 input
和基于本地文件的配置服务器)。