Python Flink Stateful 函数入口的 Kafka Key 访问

Kafka Key access on Ingress of a Python Flink Stateful function

我一直在研究 Flink Stateful Functions。它看起来非常有前途 - 除了一件事 - 我希望我只是想念它。

对于我来说,在 Python 中看不到从 kafka ingress 访问 kafka 密钥的方法。在 Java 中,我看到我可以使用反序列化器并将其有效地打包到解码的 message 对象中。但是我找不到替代方案。
在我们的例子中,键有有价值的信息,值中不存在。

有人遇到过这个 - 还是我刚刚错过了?

我想提的第一件事是,用于远程功能的内置 Kafka 入口要求密钥可以解释为 UTF-8 字符串。 如果这确实是你的情况,那么你可以通过以下方式简单地获得它:

def example(context, message):
  key = context.address.id 
  ... 
  print(key) # utf8 string

如果不是这种情况,那么不幸的是,此时您必须使用嵌入式 SDK,在将消息转发到远程函数之前提取键和值。