Spring Kafka 无法使用便携式实现

Spring Kafka not working with portable implementation

如果这是一个微不足道的问题,请耐心等待。我已经用谷歌搜索了,但我没有找到答案。

我正在学习 Spring 事件驱动开发的教程。此时此刻,目标是 agnostic/portable 实现事件驱动的实现。

为此,我有 2 个 Spring 引导项目,一个 producing/publishing 主题,另一个使用主题。

这两个项目都是为了管理 RabbitMQ 和 Kafka 而创建的,无需更改代码(或者这是意图)。

使用 RabbitMQ 一切正常,但在 Kafka 中它不起作用。问题似乎是 Kafka 生产者在主题中添加了项目名称作为前缀,而消费者不知道这个前缀。

配置:

制作人项目application.yml(仅相关部分)

spring.cloud.stream:
  bindings:
    output-products:
      destination: products
      producer:
        required-groups: auditGroup
---
spring.config.activate.on-profile: kafka

spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092

消费者项目application.yml(仅相关部分)

spring.cloud.stream.bindings.input:
  destination: products
  group: productsGroup
---
spring.config.activate.on-profile: kafka

spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092

对于生产者class,我创建了一个接口并声明为一个bean

public interface MessageSources {

    String OUTPUT_PRODUCTS = "output-products";
    String OUTPUT_RECOMMENDATIONS = "output-recommendations";
    String OUTPUT_REVIEWS = "output-reviews";

    @Output(OUTPUT_PRODUCTS)
    MessageChannel outputProducts();

    @Output(OUTPUT_RECOMMENDATIONS)
    MessageChannel outputRecommendations();

    @Output(OUTPUT_REVIEWS)
    MessageChannel outputReviews();
}

后来,我用这个bean来发布话题

@EnableBinding(ProductCompositeIntegration.MessageSources.class)
@Component
public class ProductCompositeIntegration {
    public Product createProduct(Product body) {
        messageSources.outputProducts().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
        return body;
    }

对于消费者 class 绑定,我使用 Sink.class(记住,我想要一个可移植的解决方案)

@EnableBinding(Sink.class)
public class MessageProcessor {

    private static final Logger log = LoggerFactory
            .getLogger(MessageProcessor.class);

    private final ProductService productService;

    @Autowired
    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

        log.info("Process message created at {}...", event.getEventCreatedAt());

        switch (event.getEventType()) {
....

一切就绪,并为 RabbitMQ 配置后,这项工作正常。但是当我尝试使用 Kafka 时,出现错误:

Dispatcher has no subscribers for channel 'product-composite-1.output-products'

其中 product-composite 是制作者项目的名称。

供参考,这是自动创建的主题列表

bash-4.4# kafka-topics.sh  --zookeeper zookeeper:2181 --list
__consumer_offsets
error.products.productsGroup
error.recommendations.recommendationsGroup
error.reviews.reviewsGroup
products
recommendations
reviews
zipkin

因此,自动配置处于活动状态的 kafka 库似乎无法连接到主题:

spring.cloud.stream.bindings.<messagechannel>.destination: <topicname>

升级库版本后解决的问题:

不工作

plugins {
    id 'org.springframework.boot' version '2.4.0'
    id 'io.spring.dependency-management' version '1.0.10.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
}

ext {
    set('springCloudVersion', "2020.0.0-M5")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

正在工作

plugins {
    id 'org.springframework.boot' version '2.4.2'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
}

ext {
    set('springCloudVersion', "2020.0.0")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
    implementation 'org.springframework.kafka:spring-kafka'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}