具有自动配置的属性不适用于 spring 云流和 rabbitmq

Properties with auto configure not working on spring cloud stream and rabbitmq

我有一个带有属性的配置服务器和一个作为消费者的微服务。

我尝试配置 maxAttempts 以避免重试消费者微服务,但它似乎不起作用。

我还在配置服务器上定义了绑定属性,它们工作正常。我的消费者正在收听和接收消息,但它尝试了 3 次 然后崩溃了。

这是我的 application.yml 在我的配置服务器中

server:
  servlet:
    contextPath: /cmsrsssitemap/v1

spring:
  cloud:
    stream:
      bindings:
        sitemap-main-output:
          destination: sitemap-main
          group: cms-microservices-v1
          content-type: application/json
          #consumer.concurrency: 2
        test-job-output:
          destination: test-job
          group: cms-microservices-v1
          content-type: application/json
      rabbit:
        bindings:
          test-job-output: 
            consumer:
              maxAttempts: 1
              requeueRejected: false
              autoBindDlq: true
              #dlqTtl: 5000
              #requeueRejected: false
              #dlqDeadLetterExchange: dltexchange1
              #republishToDlq: true

这是制作方的application.yml

server.servlet.contextPath: /cmsjmshandler/v1

spring:
  cloud:
    stream:
      bindings:
        sitemap-main-input:         
          destination: sitemap-main
          content-type: application/json
        test-job-input:
          destination: test-job
          group: cms-microservices-v1
          content-type: application/json

这是监听器。它抛出一个 NullPointer 用于测试目的

@Component
public class TestJobListener {


    @StreamListener(StreamProcessor.TEST_JOB)
    public void testJobInput(@Payload String input) throws InterruptedException {
//      Thread.sleep(3000);
        System.out.println("########################### "+new Date() + " Mensaje Recibido");
        throw new NullPointerException();
    }
}

StreamProcesor.java

public interface StreamProcessor {

    public static final String TEST_JOB = "test-job";

    public static final String SITEMAP_MAIN = "sitemap-main";


    @Input(StreamProcessor.TEST_JOB)
    SubscribableChannel testJobOutputInput();

    @Input(StreamProcessor.SITEMAP_MAIN)
    SubscribableChannel processSitemapMain();
}

这样做的目的是将失败的消息移至 DLQ,但它也不起作用

编辑 1:无法正常工作。我已经根据 Artem Bilan 进行了更改,但它也不起作用。

server:
  servlet:
    contextPath: /cmsrsssitemap/v1

spring:
  cloud:
    stream:
      bindings:
        test-job-output:
          destination: test-job
          group: cms-microservices-v1
          content-type: application/json
          consumer:
            maxAttempts: 1
      rabbit:
        bindings:
          test-job-output: 
            consumer:
              requeueRejected: false

maxAttempts 不是 rabbit 属性。是核心。

文档中有关于此事的示例:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#spring-cloud-stream-overview-error-handling

spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

问题是我在 StreamProcesor 上输入了错误的名称

@StreamListener(StreamProcessor.TEST_JOB)

StreamProcesor.TEST_JOB 应该是频道名称,也不是目的地。更新我的问题。

更正 SteamProcesor.java StreamProcesor.java

public interface StreamProcessor {

    public static final String TEST_JOB = "test-job-output";

    public static final String SITEMAP_MAIN = "sitemap-main";


    @Input(StreamProcessor.TEST_JOB)
    SubscribableChannel testJobOutputInput();

    @Input(StreamProcessor.SITEMAP_MAIN)
    SubscribableChannel processSitemapMain();
}

我刚刚对其进行了测试,它可以很好地配合我的这个(已更正的)配置。如果您在客户端启用 actuator/env 端点并且您可以看到属性:

(我使用了 input 和基于本地文件的配置服务器)。