auto.commit.enable=false 的 Micronaut-Kafka:如何手动提交偏移量
Micronaut-Kafka with auto.commit.enable=false: how commit manually an offset
我只想提交成功保存在数据库中的消息。
我知道我已经关闭了自动提交 application.yml
micronaut:
application:
name: demoGrpcKafka
executors:
consumer:
type: fixed
nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
bootstrap:
servers: localhost:9092
consumers:
default:
auto:
commit:
enable: false
producers:
#default:
demo-producer:
retries: 2
消费者
package com.tolearn.consumer
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaListener(groupId="myGroup")
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp ")
//saved to database
// THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
}
}
换句话说,我如何在使用 Micronaut-Kafka 时使用 commitOffset 或 commitSync() 或任何其他替代方法手动提交消息?
*** 第二版
我回到了application.yaml
consumers:
default:
auto:
commit:
enable: false
*** 第三次编辑
我尝试添加 io.micronaut.configuration.kafka.Acknowledgement(已弃用)或导入 io.micronaut.messaging.Acknowledgement,其中任何一个都导致
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
看来我必须做些别的事情才能让 Micronaut 注入这样的确认对象。我错过了什么?
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
//import io.micronaut.messaging.Acknowledgement
//import io.micronaut.messaging.annotation.Header
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
acknowledgement: Acknowledgement,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
//,header: Header
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp "
// + "header = $header"
)
//saved to database
// how commit like consumer.commitOffsets(true)
//Consumer.commitSync()
acknowledgement.ack();
}
}
整个日志是
18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process(KafkaConsumerProcessor.java:396)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
... 18 common frames omitted
18:13:13.812 [consumer-executor-thread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
根据 documentation,您可以在 KafkaListener
注释中设置 offsetStrategy
,例如
@KafkaListener(groupId="myGroup", offsetStrategy=OffsetStrategy.SYNC)
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
[...]
选择以下选项之一:
ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
DISABLED: Do not commit offsets.
SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
如果我没看错你的问题,你想把它设为SYNC
。
我只想提交成功保存在数据库中的消息。
我知道我已经关闭了自动提交 application.yml
micronaut:
application:
name: demoGrpcKafka
executors:
consumer:
type: fixed
nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
bootstrap:
servers: localhost:9092
consumers:
default:
auto:
commit:
enable: false
producers:
#default:
demo-producer:
retries: 2
消费者
package com.tolearn.consumer
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaListener(groupId="myGroup")
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp ")
//saved to database
// THE ISSUE IS HERE: how commit like consumer.commitOffsets(true) ?????
}
}
换句话说,我如何在使用 Micronaut-Kafka 时使用 commitOffset 或 commitSync() 或任何其他替代方法手动提交消息?
*** 第二版
我回到了application.yaml
consumers:
default:
auto:
commit:
enable: false
*** 第三次编辑
我尝试添加 io.micronaut.configuration.kafka.Acknowledgement(已弃用)或导入 io.micronaut.messaging.Acknowledgement,其中任何一个都导致
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
看来我必须做些别的事情才能让 Micronaut 注入这样的确认对象。我错过了什么?
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
//import io.micronaut.messaging.Acknowledgement
//import io.micronaut.messaging.annotation.Header
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
acknowledgement: Acknowledgement,
msg: String,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
//,header: Header
){
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp "
// + "header = $header"
)
//saved to database
// how commit like consumer.commitOffsets(true)
//Consumer.commitSync()
acknowledgement.ack();
}
}
整个日志是
18:13:13.812 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Kafka consumer [com.tolearn.consumer.DemoConsumer@17e970dd] failed to deserialize value: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition demotopic-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:73)
at io.micronaut.configuration.kafka.serde.JsonSerde.deserialize(JsonSerde.java:82)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
at org.apache.kafka.clients.consumer.internals.Fetcher.access00(Fetcher.java:130)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1432)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process(KafkaConsumerProcessor.java:396)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'name': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"name: "Hello"
"; line: 1, column: 6]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3564)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2899)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchNull(UTF8StreamJsonParser.java:2870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:844)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4513)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3529)
at io.micronaut.jackson.serialize.JacksonObjectSerializer.deserialize(JacksonObjectSerializer.java:71)
... 18 common frames omitted
18:13:13.812 [consumer-executor-thread-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=demo-grpc-kafka-demo-consumer, groupId=myGroup] Seeking to offset 5 for partition demotopic-0
根据 documentation,您可以在 KafkaListener
注释中设置 offsetStrategy
,例如
@KafkaListener(groupId="myGroup", offsetStrategy=OffsetStrategy.SYNC)
class DemoConsumer {
@Topic("testkey")
fun receive(@KafkaKey key: String?,
[...]
选择以下选项之一:
ASYNC: Asynchronously commit offsets using Consumer.commitAsync() after each batch of messages is processed.
ASYNC_PER_RECORD: Asynchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
AUTO: Automatically commit offsets with the Consumer.poll(long) loop.
DISABLED: Do not commit offsets.
SYNC: Synchronously commit offsets using Consumer.commitSync() after each batch of messages is processed.
SYNC_PER_RECORD: Synchronously commit offsets using Consumer.commitSync() after each ConsumerRecord is consumed.
如果我没看错你的问题,你想把它设为SYNC
。