Python Flink Stateful 函数入口的 Kafka Key 访问
Kafka Key access on Ingress of a Python Flink Stateful function
我一直在研究 Flink Stateful Functions。它看起来非常有前途 - 除了一件事 - 我希望我只是想念它。
对于我来说,在 Python 中看不到从 kafka ingress 访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反序列化器并将其有效地打包到解码的 message
对象中。但是我找不到替代方案。
在我们的例子中,键有有价值的信息,值中不存在。
有人遇到过这个 - 还是我刚刚错过了?
我想提的第一件事是,用于远程功能的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。
如果这确实是你的情况,那么你可以通过以下方式简单地获得它:
def example(context, message):
key = context.address.id
...
print(key) # utf8 string
如果不是这种情况,那么不幸的是,此时您必须使用嵌入式 SDK,在将消息转发到远程函数之前提取键和值。
我一直在研究 Flink Stateful Functions。它看起来非常有前途 - 除了一件事 - 我希望我只是想念它。
对于我来说,在 Python 中看不到从 kafka ingress 访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反序列化器并将其有效地打包到解码的 message
对象中。但是我找不到替代方案。
在我们的例子中,键有有价值的信息,值中不存在。
有人遇到过这个 - 还是我刚刚错过了?
我想提的第一件事是,用于远程功能的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。 如果这确实是你的情况,那么你可以通过以下方式简单地获得它:
def example(context, message):
key = context.address.id
...
print(key) # utf8 string
如果不是这种情况,那么不幸的是,此时您必须使用嵌入式 SDK,在将消息转发到远程函数之前提取键和值。