Spring-Kafka:如何在 Producer Kafka 中插入一个 application.yml 主题

Spring-Kafka: How to insert an application.yml topic in Producer Kafka

我有一个 spring-kafka 微服务,我最近向其中添加了一个死信,以便能够发送各种错误消息

//some code..
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("myDeadLetter", message);
    }
}

我想将死信的主题kafka称为“messageTopic”+“_deadLetter”,我的主要主题是“messageTopic”。在我的消费者中,主题名称给他 application.yml 如下:

@KafkaListener(topics = "${spring.kafka.topic.name}")

如何通过插入 application.yml 中的“+ deadLetter”来设置相同的 kafka 主题?我试过这样的事情:

@Component
@KafkaListener(topics = "${spring.kafka.topic.name}"+"_deadLetter")
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send("messageTopic_deadLetter", message);
    }
}

但它为我创建了两个同名的不同主题。我正在等待一些建议,感谢您的帮助!

Kafka Listener 接受 Topic 名称的常量,我们不能在这里修改 TOPIC 名称。

最好为实际主题和死信主题使用单独的方法(Kafka 侦听器),在 YAML 中定义两个不同的属性来保存两个主题名称。

@KafkaListener(topics = "${spring.kafka.topic.name}")
public void listen(......){

}

@KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
public void listenDlt(......){

}

从 yml 或 属性 文件中引用 send(...) 中的主题名称

@Component
@KafkaListener(topics = "${spring.kafka.deadletter.topic.name}")
public class KafkaProducer {

    @Value("${spring.kafka.deadletter.topic.name}")
    private String DLT_TOPIC_NAME;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendDeadLetter(String message) {
        kafkaTemplate.send(DLT_TOPIC_NAME, message);
    }
}

你可以用SpEL构造主题名称:

@KafkaListener(topics = "#{'${spring.kafka.topic.name}' + '_deadLetter'"})

请注意 属性 占位符和文字周围的单引号。

此示例可能与您的用例无关,但分享以防对某人有帮助。

如果您正在构建 Kafka Stream 应用程序,可以通过以下方式实现可变接收器主题名称:

  1. 生成接收器主题时,传递一个将上下文作为参数的 lambda 以及将处理名称定义的方法。
        ... /* precedent stream operations */
        // terminal operation 'to'. 
        .to(
            (k, v, ctx) -> sinkTopicNameGenerator(ctx),
            Produced.with(Serdes, Serdes)
        );
  1. 实现生成接收器主题名称的方法:
  protected static String sinkTopicNameGenerator(RecordContext ctx) {
    return ctx.topic().concat("_deadLetter");
  }

上面的例子很简单,可以简化为(k, v, ctx) -> ctx.topic().concat("_deadLetter"),但我想在需要进一步转换的情况下保留单独的方法方法,即当主题名称的一部分将被一些替换时配置文件中定义的常量或正则表达式。