Kafka Processor API中的Header有什么用?
What is the use of Header in Kafka Processor API?
我正在学习 Kafka 处理器 API 并在 ProcessorContext
中找到一种方法 headers。
headers()
Returns the headers of the current input record; could be
null if it is not available
这个方法有什么用?
在docs中只写了一行:
Returns the headers of the current input record; could be null if it
is not available
我可以对此执行一些操作吗?
A header 是可以附加到每条消息的某种元数据。 Headers 可用于各种场景,如附加信息,可在过滤记录时使用等
您可以通过处理器 API 访问消息的元数据,更准确地说是 process()
、transform()
和 transformValues()
。对于 example,为了将 header 添加到记录中,可以使用以下方法:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}
我正在学习 Kafka 处理器 API 并在 ProcessorContext
中找到一种方法 headers。
headers()
Returns the headers of the current input record; could be null if it is not available
这个方法有什么用?
在docs中只写了一行:
Returns the headers of the current input record; could be null if it is not available
我可以对此执行一些操作吗?
A header 是可以附加到每条消息的某种元数据。 Headers 可用于各种场景,如附加信息,可在过滤记录时使用等
您可以通过处理器 API 访问消息的元数据,更准确地说是 process()
、transform()
和 transformValues()
。对于 example,为了将 header 添加到记录中,可以使用以下方法:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}