Dataflow Bigquery-Bigquery 管道在较小的数据上执行,而不是在大型生产数据集上执行

Dataflow Bigquery-Bigquery pipeline executes on smaller data, but not the large production dataset

这里是 Dataflow 的新手,但已经成功创建了一个运行良好的管道。

pipleine 从 BigQuery 中读取查询,应用 ParDo(NLP 函数),然后将数据写入新的 BigQuery table。

我要处理的数据集大约有 500GB 和 46M 条记录。

当我尝试使用相同数据的子集(大约 30 万条记录)时,它工作正常并且速度很快,请参见下文:

当我用完整的数据集尝试 运行 时,它开始非常快,但随后逐渐减少并最终失败。此时作业失败并添加了大约 900k 元素,大约 6-7GB,然后元素计数实际上开始减少。

我正在使用 250 个工人和一个 n1-highmem-6 机器类型

在工作日志中我得到了其中的一些(大约 10 个):

Info
2021-04-22 06:29:38.236 EDTRefreshing due to a 401 (attempt 1/2)

这是最后的警告之一:

2021-04-22 06:29:32.392 EDTS08:[85]: GetArticles/Read+[85]: GetArticles/_PassThroughThenCleanup/ParDo(PassThrough)/ParDo(PassThrough)+[85]: ExtractEntity+[85]: WriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal+[85]: WriteToBigQuery/BigQueryBatchFileLoads/AppendDestination+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/IdentityWorkaround+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Write+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(_ShardDestinations)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Write failed.

在执行细节中有多个:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

我假设这些来自数据集中较大的文本,可能需要一段时间才能处理,所以在处理这些项目后,下一个开始。

还有一些:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

综上所述,这对我来说有点令人困惑,而且不完全直观 - 尽管它工作时的服务很棒。

我正在从 Jupyter notebook 执行作业(不使用交互式 运行ner,只是执行脚本)。

主管道如下:

p = beam.Pipeline()

#Create a collection from Bigquery
articles = p | "GetArticles" >> beam.io.ReadFromBigQuery(query='SELECT id,uuid, company_id_id, title, full_text, FROM `MY TABLE` ', gcs_location=dataflow_gcs_location, project='my_project',use_standard_sql=True)

#Extract entities with NLP
entities = articles | "ExtractEntity" >> beam.ParDo(EntityExtraction()) 

#Write to bigquery 
entities | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('myproject:dataset.table', schema = schema,custom_gcs_temp_location=dataflow_gcs_location, create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND")  ```

我做错了什么?这是内存问题吗?我不应该像这样读写 BigQuery 而是输出到一个文件并从中创建一个 table 吗?希望得到一些帮助,很抱歉这么长 post,想提供尽可能多的上下文。

我发现 Dataflow 不太适合像这样的大型 NLP 批处理作业。我解决这个问题的方法是将较大的作业分成较小的可靠 运行 的作业。因此,如果您可以可靠地 运行 100K 文档,则只需 运行 500 个工作岗位。

可能晚了,但我在 BigQuery table 上做了一些测试,其中包含 770 万行字符串,需要处理大约 10 次。 350字。

我运行和你一样的管道:

  1. 从 BigQuery 读取数据
  2. 使用 python string
  3. 清理字符串
  4. 使用 Spacy fr_core_news_lg 模型,得到字符串的词形还原部分
  5. 将数据写回 BigQuery(在不同的table)

一开始我遇到了和你一样的问题,elements/sec的数量随着时间的推移而下降。

我意识到这是 RAM 的问题。我将 machine_typecustom-1-3072 更改为 custom-1-15360-ext,并从与您相同的配置文件转到了这个:

我认为 Dataflow 可以使用 NLP 模型处理大量行,但您必须确保为工作人员提供足够的 RAM。

此外,使用 number_of_worker_harness_threads=1 确保 Dataflow 不会生成多个线程(从而将 ram 拆分为线程)也很重要。

你也可以看看这个,最初的问题是一样的

最后一件事,我的工人的 CPU 利用率来自:

收件人:

这也是缺少 RAM 的标志。

编辑:我 运行 我的管道使用与您相同的数据量规模来确保我的测试没有偏差,结果是相同的:RAM 量似乎是关键工作运行顺利: