无法在 spring-cloud-stream-binder-kafka-streams:3.1.1 中设置 GroupId
Unable to set GroupId in spring-cloud-stream-binder-kafka-streams:3.1.1
我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1
函数式编程。我尝试了多种组合来设置 GroupId,但 Consumer 始终将 GroupId 打印为 spring.application.name.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nt.utility</groupId>
<artifactId>kafka-messages</artifactId>
<version>2.0.4</version>
<packaging>jar</packaging>
<name>kafka-messages</name>
<description>Kafka Messages</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<lombok-version>1.16.8</lombok-version>
<maven-version>2.2.4.RELEASE</maven-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${maven-version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<shutdown>kill</shutdown>
</configuration>
</plugin>
</plugins>
</build>
</project>
Java代码
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input;
}
}
application.yml
spring:
application.name: kafka-messages
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: words
group: group-1
process-out-0:
destination: counts
kafka:
bindings:
process-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
process-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
streams:
binder:
brokers: localhost:9092
auto-create-topics: false
日志
2021-02-26 23:29:03.677 INFO 42872 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Subscribed to topic(s): words
2021-02-26 23:29:03.809 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Discovered group coordinator 127.0.0.1:9092 (id: 2147483647 rack: null)
2021-02-26 23:29:03.811 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] (Re-)joining group
2021-02-26 23:29:06.705 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Setting offset for partition words-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 0 rack: null)], epoch=0}}
如您所见,groupId 不是从 yml 文件中设置的。我在这上面花了很多时间,但没有运气。请帮忙。
更新
当我不使用流 API 并将我的 pom 依赖项修改为 spring-cloud-stream-binder-kafka:3.1.1[=16= 时,GroupId 似乎已正确应用]
这与Spring无关;由 KafkaStreams 自己设置。
https://kafka.apache.org/documentation/#streamsconfigs
application.id
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1
函数式编程。我尝试了多种组合来设置 GroupId,但 Consumer 始终将 GroupId 打印为 spring.application.name.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nt.utility</groupId>
<artifactId>kafka-messages</artifactId>
<version>2.0.4</version>
<packaging>jar</packaging>
<name>kafka-messages</name>
<description>Kafka Messages</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<lombok-version>1.16.8</lombok-version>
<maven-version>2.2.4.RELEASE</maven-version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${maven-version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
<shutdown>kill</shutdown>
</configuration>
</plugin>
</plugins>
</build>
</project>
Java代码
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input;
}
}
application.yml
spring:
application.name: kafka-messages
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: words
group: group-1
process-out-0:
destination: counts
kafka:
bindings:
process-out-0:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serializer: org.apache.kafka.common.serialization.Serdes$StringSerde
process-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
value.deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
streams:
binder:
brokers: localhost:9092
auto-create-topics: false
日志
2021-02-26 23:29:03.677 INFO 42872 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Subscribed to topic(s): words
2021-02-26 23:29:03.809 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Discovered group coordinator 127.0.0.1:9092 (id: 2147483647 rack: null)
2021-02-26 23:29:03.811 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] (Re-)joining group
2021-02-26 23:29:06.705 INFO 42872 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=kafka-messages-97a5b62c-b3a2-464f-84c2-5014818b7574-StreamThread-1-consumer, groupId=kafka-messages] Setting offset for partition words-0 to the committed offset FetchPosition{offset=4, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[127.0.0.1:9092 (id: 0 rack: null)], epoch=0}}
如您所见,groupId 不是从 yml 文件中设置的。我在这上面花了很多时间,但没有运气。请帮忙。
更新
当我不使用流 API 并将我的 pom 依赖项修改为 spring-cloud-stream-binder-kafka:3.1.1[=16= 时,GroupId 似乎已正确应用]
这与Spring无关;由 KafkaStreams 自己设置。
https://kafka.apache.org/documentation/#streamsconfigs
application.id
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.