使用 Spring Cloud Stream with Kafka 进行重复消息处理

Duplicate message processing using Spring Cloud Stream with Kafka

我正在使用 Spring Cloud Stream 和 Kafka 活页夹。它工作得很好,但客户端收到重复的消息。已经尝试了所有 Kafka 消费者属性,但没有结果。

检查我的应用程序示例中的 2 类 - AggregateApplication 和 EventFilterApplication。如果我 运行 EventFilterApplication - 只有 1 条消息,如果是 AggregateApplication - 2 条相同的消息。


下面是我的代码:

1) 聚合器

import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;

@SpringBootApplication
public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

2) EventFilterApplication

@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {

    @Autowired
    LiveProcessor source;

    @StreamListener(LiveProcessor.INPUT)
    public void handle(byte[] event) {
        try {

            System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));

        } catch (Exception e) {
            System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
        }
    }
    public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
    }

    interface LiveProcessor extends Source {

        String INPUT = "liveSource";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

3) application.yml

spring:
cloud:
    stream:
        kafka:
          binder:
              brokers: kafka-broker.example.com:9092
              defaultBrokerPort: 9092
              defaultZkPort: 2181
              zkNodes: kafka-zookeeper.example.com
        type: kafka
        bindings:
            liveSource:
                binder: kafka
                consumer:
                    headerMode: raw
                    autoCommitOffset: true
                destination: topic_example_name

4) build.gradle

buildscript {
    ext { springBootVersion = '1.4.2.RELEASE' }
    repositories {
        jcenter()
        maven { url 'http://repo.spring.io/plugins-release' }
    }
    dependencies {
        classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
        classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
        classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
    }
}

ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'

subprojects {
    apply plugin: 'java'
    apply plugin: 'propdeps'
    apply plugin: 'propdeps-idea'
    apply plugin: "io.spring.dependency-management"

    sourceCompatibility = 1.8

    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
            mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
            mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
        }
    }

    dependencies {
        compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
            exclude module: "spring-boot-starter-tomcat"
            exclude group: 'log4j'
        }

        compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
            exclude group: "org.slf4j"
        }

        compile("org.springframework.cloud:spring-cloud-stream:")

        compile("org.springframework.cloud:spring-cloud-starter-sleuth")

        compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")

        testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
            exclude group: "org.slf4j"
        }
    }
}

重复是由EventFilterApplication作为父根造成的:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
            .from(EventFilterApplication.class)
            .run(args);
    }
}

这很可能会创建两个订阅。除了将 EventFilterApplication 添加为 root,您还可以简单地执行以下操作:

public class AggregateApplication {
    public static void main(String[] args) {
        new AggregateApplicationBuilder(args)
            .from(EventFilterApplication.class)
            // rest of the pipeline
            .run(args);
    }
}

如果您不需要创建聚合,这应该足够了:

public static void main(String[] args) {
        SpringApplication.run(EventFilterApplication.class, args);
}

编辑:添加了一个额外的例子并澄清了答案。