从 Kafka Consumer 内部,如何获取消息头
From inside of Kafka Consumer, how get the message header
简而言之,我想让消费者收听消息头,但我收到“未指定必需参数 [Header header]”。
以下是我尝试失败的原因:
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
header: Header,
acknowledgement: Acknowledgement
){
println("Key = $key " +
"msg = $msg " +
"header = $header"
)
acknowledgement.ack();
}
}
它打印出这个错误:
00:02:05.890 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = demotopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1607828525818, serialized key size = 36, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = My-Header, value = [109, 121, 72, 101, 97, 100, 101, 114])], isReadOnly = false), key = 2afe3f0d-40c0-44f6-93a3-cce06678df80, value = name: "Hello"
)]] for Kafka consumer [com.tolearn.consumer.DemoConsumer@4ca2237e] produced error: Required argument [Header header] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [Header header] not specified
at io.micronaut.core.bind.DefaultExecutableBinder.bind(DefaultExecutableBinder.java:88)
at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process(KafkaConsumerProcessor.java:494)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
如果它添加了一些东西,这里是生产者:
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
public interface DemoProducer {
@Topic("demotopic")
fun sendDemoMsg(
@KafkaKey key: String?,
@Header("My-Header") myHeader: String,
msg: String?) {
}
}
服务发送带有header throw producer的消息
package com.tolearn.service
import com.tolearn.producer.DemoProducer
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class DemoService {
@Inject
@Named("dp")
lateinit var dp : DemoProducer
fun postDemo(key: String, msg: String){
//blocking
dp.sendDemoMsg(key, "myHeader", msg)
}
}
我期待打印... My-Header: myHeader
*** 编辑
对于未来的读者,我的解决方案变成了:
aplication.yml
micronaut:
application:
name: demoGrpcKafka
executors:
consumer:
type: fixed
nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
bootstrap:
servers: localhost:9092
consumers:
default:
auto:
commit:
enable: false
producers:
#default:
demo-producer:
retries: 2
my:
application:
token: tokenFromYml
制作人
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
@Header(name = "X-Token", value = "${my.application.token}")
public interface DemoProducer {
@Topic("demotopic")
fun sendDemoMsg(
@KafkaKey key: String?,
msg: String?) {
}
}
消费者
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.KafkaHeaders
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.MessageHeaders
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
headers: MessageHeaders,
acknowledgement: Acknowledgement,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
){
val h = (headers).get("X-Token")
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp " +
"header = $h"
)
acknowledgement.ack();
}
}
看来你应该尝试使用 io.micronaut.messaging.MessageHeaders 而不仅仅是 kafka Header class.
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
headers: MessageHeaders,
acknowledgement: Acknowledgement)
您也可以像在制作方那样使用 @io.micronaut.messaging.annotation.Header("My-Header") 指定 header 的值消费者。
不要忘记添加 @javax.annotation.Nullable 如果 header 是可选的,或者只是 String? 在 Kotlin 的情况下。
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
@Nullable @Header("My-Header") myHeader: String,
acknowledgement: Acknowledgement)
简而言之,我想让消费者收听消息头,但我收到“未指定必需参数 [Header header]”。
以下是我尝试失败的原因:
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
header: Header,
acknowledgement: Acknowledgement
){
println("Key = $key " +
"msg = $msg " +
"header = $header"
)
acknowledgement.ack();
}
}
它打印出这个错误:
00:02:05.890 [consumer-executor-thread-1] ERROR i.m.c.k.e.KafkaListenerExceptionHandler - Error processing record [Optional[ConsumerRecord(topic = demotopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1607828525818, serialized key size = 36, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = My-Header, value = [109, 121, 72, 101, 97, 100, 101, 114])], isReadOnly = false), key = 2afe3f0d-40c0-44f6-93a3-cce06678df80, value = name: "Hello"
)]] for Kafka consumer [com.tolearn.consumer.DemoConsumer@4ca2237e] produced error: Required argument [Header header] not specified
io.micronaut.core.bind.exceptions.UnsatisfiedArgumentException: Required argument [Header header] not specified
at io.micronaut.core.bind.DefaultExecutableBinder.bind(DefaultExecutableBinder.java:88)
at io.micronaut.configuration.kafka.processor.KafkaConsumerProcessor.lambda$process(KafkaConsumerProcessor.java:494)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
如果它添加了一些东西,这里是生产者:
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
public interface DemoProducer {
@Topic("demotopic")
fun sendDemoMsg(
@KafkaKey key: String?,
@Header("My-Header") myHeader: String,
msg: String?) {
}
}
服务发送带有header throw producer的消息
package com.tolearn.service
import com.tolearn.producer.DemoProducer
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class DemoService {
@Inject
@Named("dp")
lateinit var dp : DemoProducer
fun postDemo(key: String, msg: String){
//blocking
dp.sendDemoMsg(key, "myHeader", msg)
}
}
我期待打印... My-Header: myHeader
*** 编辑
对于未来的读者,我的解决方案变成了:
aplication.yml
micronaut:
application:
name: demoGrpcKafka
executors:
consumer:
type: fixed
nThreads: 1
#kafka.bootstrap.servers: localhost:9092
kafka:
bootstrap:
servers: localhost:9092
consumers:
default:
auto:
commit:
enable: false
producers:
#default:
demo-producer:
retries: 2
my:
application:
token: tokenFromYml
制作人
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
@Header(name = "X-Token", value = "${my.application.token}")
public interface DemoProducer {
@Topic("demotopic")
fun sendDemoMsg(
@KafkaKey key: String?,
msg: String?) {
}
}
消费者
package com.tolearn.consumer
import io.micronaut.configuration.kafka.Acknowledgement
import io.micronaut.configuration.kafka.KafkaHeaders
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetStrategy
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.MessageHeaders
@KafkaListener(
groupId="myGroup",
offsetStrategy=OffsetStrategy.SYNC_PER_RECORD
)
class DemoConsumer {
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
headers: MessageHeaders,
acknowledgement: Acknowledgement,
offset: Long,
partition: Int,
topic: String,
timestamp: Long
){
val h = (headers).get("X-Token")
println("Key = $key " +
"msg = $msg " +
"offset = $offset " +
"partition = $partition " +
"topic = $topic " +
"timestamp = $timestamp " +
"header = $h"
)
acknowledgement.ack();
}
}
看来你应该尝试使用 io.micronaut.messaging.MessageHeaders 而不仅仅是 kafka Header class.
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
headers: MessageHeaders,
acknowledgement: Acknowledgement)
您也可以像在制作方那样使用 @io.micronaut.messaging.annotation.Header("My-Header") 指定 header 的值消费者。
不要忘记添加 @javax.annotation.Nullable 如果 header 是可选的,或者只是 String? 在 Kotlin 的情况下。
@Topic("demotopic")
fun receive(@KafkaKey key: String?,
msg: String,
@Nullable @Header("My-Header") myHeader: String,
acknowledgement: Acknowledgement)