改进从 apache beam 中的转换内部读取 BQ 数据 - Python
Improving reading BQ data from inside of a transformation in apache beam - Python
我有一个提交到 google 云数据流的 Apache Beam 管道。
我的用例如下:
- 我有一个 BQ table A,我使用 Beam IO 本机连接器读取了一个 pcollection
- 我将 pcollection 传递给一个转换,我需要检查每个元素是否当前存在于 BQ 中的 table B 中。我找不到使用本机 Apache Beam 执行查询的方法,这就是我使用 google
的 BQ python 库的原因
- 之后,我使用光束 IO 连接器将我想要的结果保存到 BQ table C
我的方法效果不错,但成本很高。例如,对于 150k 条记录的 table A,至少需要 15 天的处理时间,在 google 数据流中有 475 名工作人员将其打包为 2 小时。我知道成本高和执行时间长的主要原因是我从转换内部为每个元素提交的 SQL 查询,因为这需要时间。
你们以前遇到过这样的问题吗?或者你知道我可以在我的代码中发明一种改进来降低成本吗?
一个简单优雅的side solution(不知道怎么想不出来)
我没有将 Table A 直接传递给要处理的光束作业,而是创建了一个新的 table A2,它是 table A 和 [= 之间左连接的结果20=]B.
所以我从工作人员那里请求的数据抛出 SQL 查询,将已经存在于作业的输入数据中 (table A2)
这样节省了大量的计算资源
我有一个提交到 google 云数据流的 Apache Beam 管道。 我的用例如下:
- 我有一个 BQ table A,我使用 Beam IO 本机连接器读取了一个 pcollection
- 我将 pcollection 传递给一个转换,我需要检查每个元素是否当前存在于 BQ 中的 table B 中。我找不到使用本机 Apache Beam 执行查询的方法,这就是我使用 google 的 BQ python 库的原因
- 之后,我使用光束 IO 连接器将我想要的结果保存到 BQ table C
我的方法效果不错,但成本很高。例如,对于 150k 条记录的 table A,至少需要 15 天的处理时间,在 google 数据流中有 475 名工作人员将其打包为 2 小时。我知道成本高和执行时间长的主要原因是我从转换内部为每个元素提交的 SQL 查询,因为这需要时间。
你们以前遇到过这样的问题吗?或者你知道我可以在我的代码中发明一种改进来降低成本吗?
一个简单优雅的side solution(不知道怎么想不出来)
我没有将 Table A 直接传递给要处理的光束作业,而是创建了一个新的 table A2,它是 table A 和 [= 之间左连接的结果20=]B.
所以我从工作人员那里请求的数据抛出 SQL 查询,将已经存在于作业的输入数据中 (table A2)
这样节省了大量的计算资源