使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId

Unable To Set groupId and clientId when using Spring Cloud Stream Kafka Binder

我在处理 Spring Cloud Stream Kafka Binder 时遇到严重问题。 Spring Cloud 3.0.2.RELEASE 的配置设置存在很多歧义和一致性问题。我一直在尝试为 Kafka 主题设置组 ID 和客户端 ID,但尽管尝试了各种不同的组合,但我仍无法正确配置组 ID。

文档声称我们应该能够通过配置以下设置之一来设置组 ID 和客户端 ID: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties

spring.cloud.stream.default.group
spring.cloud.stream.default.consumer.group
spring.cloud.stream.kafka.default.consumer.group
spring.cloud.stream.bindings.<channelName>.group

None 以上配置用于设置生产者的客户端 ID 或消费者的组 ID。我取得的唯一进展是通过完全不同的配置设置客户端 ID。

spring.kafka.client-id
spring.kafka.admin.client-id
spring.kafka.producer.client-id

在使用这些设置成功设置客户端 ID 后,我尝试为消费者设置组 ID,但令人惊讶的是它不起作用。

spring.kafka.group-id   <---- does not exist as a property, but tried this anyway
spring.kafka.consumer.group-id

编辑:这是应用程序设置。

Application.java

@SpringBootApplication
@EnableSwagger2
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Docket swaggerApi() {
    return new Docket(DocumentationType.SWAGGER_2)
        .select()
        .apis(RequestHandlerSelectors.any())
        .paths(regex("^(?!.*error).+$"))
        .build()
        .pathMapping("/");
  }
}

application.yaml

spring:
  cloud:
    stream:
      bindings:
        MyKafkaTopicBinderChannel:
          destination: MyKafkaTopic
          group: MyServiceGroup
      default:
        producer:
          useNativeEncoding: on
        consumer:
          useNativeEncoding: on
        contentType: application/*+avro
      kafka:
        binder:
          brokers: some.broker.io
          producer-properties:
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema:
              registry:
                url: some.registry.io
          consumer-properties:
            key:
              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value:
              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema:
              registry.url: some.registry.io
            specific:
              avro:
                reader: true

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>io.some.org</groupId>
  <artifactId>MyService</artifactId>
  <version>1.0.0</version>
  <name>chatbotApi</name>
  <description>Spring Boot Service</description>

  <properties>
    <java.version>11</java.version>
    <gson.version>2.8.6</gson.version>
    <springfox.version>2.9.2</springfox.version>
    <swagger-annotations.version>1.6.0</swagger-annotations.version>
    <swagger-models.version>1.6.0</swagger-models.version>
    <jackson-datatype-jsr310.version>2.10.2</jackson-datatype-jsr310.version>
    <avro.version>1.9.2</avro.version>
    <avro-maven-plugin.version>1.9.2</avro-maven-plugin.version>
    <confluent.kafka.version>5.4.0</confluent.kafka.version>
    <kafka-clients.version>2.4.0</kafka-clients.version>
    <spring-cloud.version>3.0.2.RELEASE</spring-cloud.version>
  </properties>

  <repositories>
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.4.RELEASE</version>
    <relativePath/>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      <version>${spring-cloud.version}</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>${gson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>${jackson-datatype-jsr310.version}</version>
    </dependency>
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger2</artifactId>
      <version>${springfox.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
          <groupId>io.swagger</groupId>
          <artifactId>swagger-annotations</artifactId>
        </exclusion>
        <exclusion>
          <groupId>io.swagger</groupId>
          <artifactId>swagger-models</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger-ui</artifactId>
      <version>${springfox.version}</version>
    </dependency>
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-annotations</artifactId>
      <version>${swagger-annotations.version}</version>
    </dependency>
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-models</artifactId>
      <version>${swagger-models.version}</version>
    </dependency>

    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-schema-registry-client</artifactId>
      <version>${confluent.kafka.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>${confluent.kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-streams-avro-serde</artifactId>
      <version>${confluent.kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka-clients.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${avro.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-tools</artifactId>
      <version>${avro.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>org.junit.vintage</groupId>
          <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <version>${spring-cloud.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <excludes>
          <exclude>local.yaml</exclude>
          <exclude>avro/*</exclude>
        </excludes>
        <filtering>true</filtering>
      </resource>
    </resources>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>${avro-maven-plugin.version}</version>
        <executions>
          <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
              <goal>protocol</goal>
              <goal>idl-protocol</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

SpringIntegrationService.java

@Component
@EnableBinding(SpringIntegrationService.KafkaTopicBindings.class)
public class SpringIntegrationService {
  private static Logger logger = LoggerFactory.getLogger(SpringIntegrationService.class);
  private MessageChannel someChannel;

  public interface KafkaTopicBindings {
    String MY_KAFKA_TOPIC_BINDER_CHANNEL = "MyKafkaTopicBinderChannel";

    @Output(KafkaTopicBindings.MY_KAFKA_TOPIC_BINDER_CHANNEL)
    MessageChannel someChannel();
  }

  public SpringIntegrationService(KafkaTopicBindings bindings) {
    this.someChannel = bindings.someChannel();
  }

  @ServiceActivator(inputChannel = "entry.kafka")
  public boolean entryKafka(Message<someChannel> msg) {
    logger.info("entryKafka(): Payload: {}", msg.getPayload());

    try {
      return someChannel.send(MessageBuilder.withPayload(msg.getPayload())
          .setHeader(KafkaHeaders.MESSAGE_KEY, "Some Key").build());
    } catch (Exception e) {
      logger.warn("entryKafka(): Failed to send message onto someChannel topic", e);
      return false;
    }
  }
}

我不确定什么对您不起作用,而且在没有看到您提供的完整示例的情况下无法说。所以这是一个我们可以用作起点的简单示例:

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleStreamApplication.class,
                "--spring.cloud.function.definition=uppercase",
                "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
                "--spring.cloud.stream.bindings.uppercase-in-0.group=uppercase.group");
    }

   @Bean
    public Function<String, String> uppercase() {
       return v -> v.toUpperCase();
    }
}

这将导致适当的 creation/assignment 消费者组。你是说以上对你不起作用?

这里是repo where I tried to update the application mentioned in the blog。清理了配置中的一些内容并将示例更新为使用新的功能模型。我证实这有效。你能 运行 把它和你的设置进行比较吗?如果您可以将此示例作为报告任何潜在问题的一种方式,那将有助于我们进一步为您提供帮助。