在 Apache Flink 中使用接收器进行读取?

Using sink in Apache Flink for read purposes?

我是 Apache Flink(和 Whosebug)的新手,我想知道处理以下场景的最佳实践:

我目前正在使用来自其他应用程序的 KafkaSource 使用实时消息。如果这些消息中的键存在于我创建并有权访问的本地数据库中,则其中一些消息将需要进行转换。然后需要将这个转换后的消息一个一个地发送到 KafkaSink。

为了检查消息是否需要转换,我需要查看该特定消息的密钥是否存在于我的本地数据库中(我必须为每条消息查询本地数据库以检查其密钥) .

执行此操作的有效方法是什么?

我有两个想法:

  1. 打开与本地数据库的连接并执行查询以查看我的本地数据库中是否存在该消息的记录。对流中的每条消息重复此操作。

  2. 扩展flink RichSinkFunction 并通过它打开一个连接并使用invoke 方法执行查询。使用此 RichSink 为流中的每条消息重复此操作。

性能问题:我只想打开一次与本地数据库的连接。我认为方法 #1 会打开和关闭每条消息的连接,而方法 #2 只会打开和关闭一次连接。

更一般地说,创建 RichSink 是否适合仅 运行 本地数据库中的一些查询以供读取?我不会使用这个 RichSink 将任何数据实际写入本地数据库。

谢谢!

从 Flink 访问外部系统的首选方法是使用 AsyncFunctionhttps://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio/

也就是说,如果您的数据库可以处理负载并且足够快以跟上流吞吐量。如果没有,您将希望从数据库中实现某种 CDC 流并将其内容存储在本地作为 Flink 状态。然后,有一个 ConnectedStream 以便它们都可以在 CoMapCoFlatMap 运算符中共享状态。

ConnectedStreamAsyncFunction 是解决此类问题的首选方法。

如果您无法访问所有 Flink 抽象(例如,如果您在 Flink 之上有一些现有框架)但您可以实例化 FlatMapFunction,您可以求助于 RichFlatMapFunction - 你如果您使用 open 方法来实例化它,那么以这种方式只维护几个与数据库的连接。