Pyspark - 定期从增量配置单元读取 table
Pyspark - read periodically from an incremental hive table
我正在使用 pyspark 处理一个用例。
我的 pyspark 作业应该定期从 Hive tables 读取并在其上应用一些聚合和转换。
但我无法每次都阅读完整的 table,因为我需要将输出附加到另一个 table.Can 任何人请提出任何想法。我正在考虑的一种方法是在每个进程之后跟踪配置单元 table 的 rowId 或 rownum。
Ps:这不是流式用例
注意:我是新手。
谢谢,
阿尔宾
让我们分解问题。
- 创建两个 table 来替换现有的:
- 创建基地 table 和三角洲 table。
- 创建一个合并了两个 table 的视图。
(用于为您提供截至 'now' 的所有数据的完整视图。已排除
来自视图的“处理”标记数据。我稍后会解释原因。)
- 添加数据时,它会添加到增量 table。
- 何时开始处理数据:将增量 table 中的数据标记为“正在处理”
- 将“处理中”数据复制到基础 table,并完成任何需要的 Process/update 聚合。
- 完成计算后,从增量中删除“正在处理”数据 table。
希望您现在清楚为什么要从 'now' 视图中排除标有“正在处理”的数据。
我正在使用 pyspark 处理一个用例。 我的 pyspark 作业应该定期从 Hive tables 读取并在其上应用一些聚合和转换。 但我无法每次都阅读完整的 table,因为我需要将输出附加到另一个 table.Can 任何人请提出任何想法。我正在考虑的一种方法是在每个进程之后跟踪配置单元 table 的 rowId 或 rownum。 Ps:这不是流式用例
注意:我是新手。
谢谢, 阿尔宾
让我们分解问题。
- 创建两个 table 来替换现有的:
- 创建基地 table 和三角洲 table。
- 创建一个合并了两个 table 的视图。 (用于为您提供截至 'now' 的所有数据的完整视图。已排除 来自视图的“处理”标记数据。我稍后会解释原因。)
- 添加数据时,它会添加到增量 table。
- 何时开始处理数据:将增量 table 中的数据标记为“正在处理”
- 将“处理中”数据复制到基础 table,并完成任何需要的 Process/update 聚合。
- 完成计算后,从增量中删除“正在处理”数据 table。
希望您现在清楚为什么要从 'now' 视图中排除标有“正在处理”的数据。