我如何从 Spring Cloud Stream Kafka Binder 中的偏移量获取消息?
How do i get the messages from the offset in Spring Cloud Stream Kafka Binder?
我可以连接到主题,尽管我不确定如何从主题中获取消息。这是显示我有 1076 条关于该主题的记录的日志:
日志
2021-10-05 11:10:09.053 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions revoked: []
2021-10-05 11:10:09.054 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:10:10.063 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:10:13.189 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Successfully joined group with generation 1
2021-10-05 11:10:13.205 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting newly assigned partitions: MY_TOPIC-0
2021-10-05 11:10:13.278 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Found no committed offset for partition MY_TOPIC-0
2021-10-05 11:10:13.664 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Resetting offset for partition MY_TOPIC-0 to offset 1076.
2021-10-05 11:10:13.730 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions assigned: [MY_TOPIC-0]
2021-10-05 11:10:13.731 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Seeking to EARLIEST offset of partition MY_TOPIC-0
2021-10-05 11:10:13.787 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Resetting offset for partition MY_TOPIC-0 to offset 1076.
2021-10-05 11:46:44.345 INFO 17364 --- [thread | latest] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Group coordinator test.kafka.com:6667 (id: 2246481044 rack: null) is unavailable or invalid, will attempt rediscovery
2021-10-05 11:46:51.625 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Discovered group coordinator test.kafka.com:6667 (id: 2147482644 rack: null)
2021-10-05 11:46:56.514 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Attempt to heartbeat failed for since member id consumer-2-386b3e7b-b8a1-48c5-9gd3-5e587e4237ad is not valid.
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Revoking previously assigned partitions [MY_TOPIC-0]
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions revoked: [MY_TOPIC-0]
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:46:56.572 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:46:59.687 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Successfully joined group with generation 3
2021-10-05 11:46:59.688 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting newly assigned partitions: MY_TOPIC-0
2021-10-05 11:46:59.749 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting offset for partition MY_TOPIC-0 to the committed offset FetchPosition{offset=1076, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=test.kafka.com:6667 (id: 1003 rack: /default-rack), epoch=2}}
2021-10-05 11:46:59.812 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions assigned: [MY_TOPIC-0]
消费者Class
public interface EventConsumer {
@Input("my-group-id")
SubscribableChannel consumeMessage();
}
听众Class
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {
@StreamListener(target = "my-group-id")
public void processMessage(Object msg) {
log.info("*** MESSAGE: ***", msg);
**do something**
**save messages**
}
}
Application.yml
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max-poll-records: 10
cloud:
zookeeper:
connect-string: test.kafka.com:2181,test.kafka.com:2181,test.kafka.com:2181
stream:
kafka:
bindings:
my-group-id:
consumer:
autoCommitOffset: false
binder:
brokers:
- test.kafka.com:6667
- test.kafka.com:6667
- test.kafka.com:6667
auto-create-topics: false
auto-add-partitions: false
jaas:
controlFlag: REQUIRED
loginModule: com.sun.security.auth.module.Krb5LoginModule
options:
useKeyTab: true
storeKey: true
serviceName: kafka
keyTab: C:\files\user.keytab
principal: user@test.com
debug: true
configuration:
security:
protocol: SASL_PLAINTEXT
bindings:
my-group-id:
binder: kafka
destination: MY_TOPIC
group: test-kafka-service
servlet:
multipart:
max-file-size: 50MB
max-request-size: 50MB
spring.cloud.stream.kafka.bindings.my-group-id.consumer.resetOffsets: true
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 300000
阅读日志后,它甚至没有转到我的侦听器 class,我在其中放置了一个记录器。对此有什么想法吗?
要让 @StreamListener
在日志的开头重新开始,请在绑定上配置 group
(以便 auto.offset.reset 设置为 earliest
并且将 resetOffsets
设置为 true
.
spring.cloud.stream.kafka.bindings.my-group-id.consumer.resetOffsets=true
resetOffsets
Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a KafkaBindingRebalanceListener
is provided; see Using a KafkaBindingRebalanceListener. See Resetting Offsets for more information about this property.
Default: false.
编辑
对我来说效果很好:
@SpringBootApplication
public class So69432739Application {
public static void main(String[] args) {
SpringApplication.run(So69432739Application.class, args);
}
@Bean
public Consumer<String> input() {
return System.out::println;
}
@Bean
ApplicationRunner runner(KafkaOperations<byte[], byte[]> ops) {
return args -> {
ops.send("input-in-0", "one".getBytes());
ops.send("input-in-0", "two".getBytes());
ops.send("input-in-0", "three".getBytes());
};
}
}
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.kafka.bindings.input-in-0.consumer.reset-offsets=true
第二次我运行它:
one
two
three
one
two
three
Setting offset for partition input-in-0-0 to the committed offset FetchPosition{offset=3, ...
Resetting offset for partition input-in-0-0 to position FetchPosition{offset=0, ...
请注意,我使用的是较新的功能样式(@StreamListener
已弃用);尽管这对该功能没有影响。
EDIT2
你不能那样混合属性和 YAML;我复制了你的 YAML(注释掉了一些我不需要的)并且它工作正常......
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max-poll-records: 10
cloud:
stream:
kafka:
bindings:
my-group-id:
consumer:
autoCommitOffset: false
reset-offsets: true
# binder:
# brokers:
# - test.kafka.com:6667
# - test.kafka.com:6667
# - test.kafka.com:6667
# auto-create-topics: false
# auto-add-partitions: false
# jaas:
# controlFlag: REQUIRED
# loginModule: com.sun.security.auth.module.Krb5LoginModule
# options:
# useKeyTab: true
# storeKey: true
# serviceName: kafka
# keyTab: C:\files\user.keytab
# principal: user@test.com
# debug: true
# configuration:
# security:
# protocol: SASL_PLAINTEXT
bindings:
input-int-0:
binder: kafka
destination: input-in-0
group: test-kafka-service
# servlet:
# multipart:
# max-file-size: 50MB
# max-request-size: 50MB
Setting offset for partition input-in-0-0 to the committed offset FetchPosition{offset=6 ...
Seeking to EARLIEST offset of partition input-in-0-0
Resetting offset for partition input-in-0-0 to position FetchPosition{offset=0 ...
我可以连接到主题,尽管我不确定如何从主题中获取消息。这是显示我有 1076 条关于该主题的记录的日志:
日志
2021-10-05 11:10:09.053 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions revoked: []
2021-10-05 11:10:09.054 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:10:10.063 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:10:13.189 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Successfully joined group with generation 1
2021-10-05 11:10:13.205 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting newly assigned partitions: MY_TOPIC-0
2021-10-05 11:10:13.278 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Found no committed offset for partition MY_TOPIC-0
2021-10-05 11:10:13.664 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Resetting offset for partition MY_TOPIC-0 to offset 1076.
2021-10-05 11:10:13.730 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions assigned: [MY_TOPIC-0]
2021-10-05 11:10:13.731 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Seeking to EARLIEST offset of partition MY_TOPIC-0
2021-10-05 11:10:13.787 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=latest] Resetting offset for partition MY_TOPIC-0 to offset 1076.
2021-10-05 11:46:44.345 INFO 17364 --- [thread | latest] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Group coordinator test.kafka.com:6667 (id: 2246481044 rack: null) is unavailable or invalid, will attempt rediscovery
2021-10-05 11:46:51.625 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Discovered group coordinator test.kafka.com:6667 (id: 2147482644 rack: null)
2021-10-05 11:46:56.514 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Attempt to heartbeat failed for since member id consumer-2-386b3e7b-b8a1-48c5-9gd3-5e587e4237ad is not valid.
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Revoking previously assigned partitions [MY_TOPIC-0]
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions revoked: [MY_TOPIC-0]
2021-10-05 11:46:56.516 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:46:56.572 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] (Re-)joining group
2021-10-05 11:46:59.687 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=latest] Successfully joined group with generation 3
2021-10-05 11:46:59.688 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting newly assigned partitions: MY_TOPIC-0
2021-10-05 11:46:59.749 INFO 17364 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=latest] Setting offset for partition MY_TOPIC-0 to the committed offset FetchPosition{offset=1076, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=test.kafka.com:6667 (id: 1003 rack: /default-rack), epoch=2}}
2021-10-05 11:46:59.812 INFO 17364 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Partitions assigned: [MY_TOPIC-0]
消费者Class
public interface EventConsumer {
@Input("my-group-id")
SubscribableChannel consumeMessage();
}
听众Class
@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {
@StreamListener(target = "my-group-id")
public void processMessage(Object msg) {
log.info("*** MESSAGE: ***", msg);
**do something**
**save messages**
}
}
Application.yml
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max-poll-records: 10
cloud:
zookeeper:
connect-string: test.kafka.com:2181,test.kafka.com:2181,test.kafka.com:2181
stream:
kafka:
bindings:
my-group-id:
consumer:
autoCommitOffset: false
binder:
brokers:
- test.kafka.com:6667
- test.kafka.com:6667
- test.kafka.com:6667
auto-create-topics: false
auto-add-partitions: false
jaas:
controlFlag: REQUIRED
loginModule: com.sun.security.auth.module.Krb5LoginModule
options:
useKeyTab: true
storeKey: true
serviceName: kafka
keyTab: C:\files\user.keytab
principal: user@test.com
debug: true
configuration:
security:
protocol: SASL_PLAINTEXT
bindings:
my-group-id:
binder: kafka
destination: MY_TOPIC
group: test-kafka-service
servlet:
multipart:
max-file-size: 50MB
max-request-size: 50MB
spring.cloud.stream.kafka.bindings.my-group-id.consumer.resetOffsets: true
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 300000
阅读日志后,它甚至没有转到我的侦听器 class,我在其中放置了一个记录器。对此有什么想法吗?
要让 @StreamListener
在日志的开头重新开始,请在绑定上配置 group
(以便 auto.offset.reset 设置为 earliest
并且将 resetOffsets
设置为 true
.
spring.cloud.stream.kafka.bindings.my-group-id.consumer.resetOffsets=true
resetOffsets
Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a
KafkaBindingRebalanceListener
is provided; see Using a KafkaBindingRebalanceListener. See Resetting Offsets for more information about this property.
Default: false.
编辑
对我来说效果很好:
@SpringBootApplication
public class So69432739Application {
public static void main(String[] args) {
SpringApplication.run(So69432739Application.class, args);
}
@Bean
public Consumer<String> input() {
return System.out::println;
}
@Bean
ApplicationRunner runner(KafkaOperations<byte[], byte[]> ops) {
return args -> {
ops.send("input-in-0", "one".getBytes());
ops.send("input-in-0", "two".getBytes());
ops.send("input-in-0", "three".getBytes());
};
}
}
spring.cloud.stream.bindings.input-in-0.group=grp
spring.cloud.stream.kafka.bindings.input-in-0.consumer.reset-offsets=true
第二次我运行它:
one
two
three
one
two
three
Setting offset for partition input-in-0-0 to the committed offset FetchPosition{offset=3, ...
Resetting offset for partition input-in-0-0 to position FetchPosition{offset=0, ...
请注意,我使用的是较新的功能样式(@StreamListener
已弃用);尽管这对该功能没有影响。
EDIT2
你不能那样混合属性和 YAML;我复制了你的 YAML(注释掉了一些我不需要的)并且它工作正常......
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max-poll-records: 10
cloud:
stream:
kafka:
bindings:
my-group-id:
consumer:
autoCommitOffset: false
reset-offsets: true
# binder:
# brokers:
# - test.kafka.com:6667
# - test.kafka.com:6667
# - test.kafka.com:6667
# auto-create-topics: false
# auto-add-partitions: false
# jaas:
# controlFlag: REQUIRED
# loginModule: com.sun.security.auth.module.Krb5LoginModule
# options:
# useKeyTab: true
# storeKey: true
# serviceName: kafka
# keyTab: C:\files\user.keytab
# principal: user@test.com
# debug: true
# configuration:
# security:
# protocol: SASL_PLAINTEXT
bindings:
input-int-0:
binder: kafka
destination: input-in-0
group: test-kafka-service
# servlet:
# multipart:
# max-file-size: 50MB
# max-request-size: 50MB
Setting offset for partition input-in-0-0 to the committed offset FetchPosition{offset=6 ...
Seeking to EARLIEST offset of partition input-in-0-0
Resetting offset for partition input-in-0-0 to position FetchPosition{offset=0 ...