如何在 Flink Stateful Functions 应用程序中创建自动保存点?
How to make an automatic savepoint in Flink Stateful Functions application?
我正在尝试深入研究新的有状态函数方法,并且我已经尝试手动创建保存点 (https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/state-bootstrap.html#creating-a-savepoint)。
它就像一个魅力,但我找不到如何自动执行它的方法。例如,我有几百万个密钥,我需要将它们全部写入保存点。
您的问题是如何将示例中的 env.fromElements
替换为从文件或其他数据源中读取的内容? Flink 的 DataSet API,也就是这里使用的,可以从任何 HadoopInputFormat
读取。有关详细信息,请参阅 DataSet Connectors。
常见情况有易于使用的快捷方式。如果您只想使用 TextInputFormat
从文件中读取数据,那将如下所示:
env.readTextFile(path)
并使用 CsvInputFormat
:
从 CSV 文件中读取
env.readCsvFile(path)
有关使用这些快捷方式的更多信息,请参阅 Data Sources。
如果我误解了问题,请澄清您的疑虑。
我正在尝试深入研究新的有状态函数方法,并且我已经尝试手动创建保存点 (https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-and-operations/state-bootstrap.html#creating-a-savepoint)。
它就像一个魅力,但我找不到如何自动执行它的方法。例如,我有几百万个密钥,我需要将它们全部写入保存点。
您的问题是如何将示例中的 env.fromElements
替换为从文件或其他数据源中读取的内容? Flink 的 DataSet API,也就是这里使用的,可以从任何 HadoopInputFormat
读取。有关详细信息,请参阅 DataSet Connectors。
常见情况有易于使用的快捷方式。如果您只想使用 TextInputFormat
从文件中读取数据,那将如下所示:
env.readTextFile(path)
并使用 CsvInputFormat
:
env.readCsvFile(path)
有关使用这些快捷方式的更多信息,请参阅 Data Sources。
如果我误解了问题,请澄清您的疑虑。