为什么我的 Python BigQuery Dataflow 接收器没有将记录插入数据库?
Why is my Python BigQuery Dataflow sink not inserting records into the database?
我正在使用 Python (2.7) 并在 Google 的 DataFlow 环境中工作,不用说,Google 还没有完全清除所有内容,并且文档还不够。但是,从 Dataflow 写入 BigQuery 的部分记录在此处 BigQuery Sink。
根据文档,为了指定架构,您需要输入一个字符串:
schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'
table名称、项目ID和数据集ID是这样的:'example_project_id:example_dataset_id.example_table_name'
现在,所有这些都在起作用。请参阅下面的代码,但据我所知,它正在成功创建 table 和字段。注意:项目 ID 设置为函数参数的一部分。
bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
现在,看起来我可以使用这个来插入东西:
bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)
但是由于某些原因,当将值打包到 PCollection 中时,它说它插入到 BigQuery 中,但是当我查询 table 时,它什么也没显示。
为什么没有插入?我没有看到任何错误,但没有向 BigQuery 插入任何内容。
这是 PCollection 中包含的数据的样子,我有将近 1,100 行要插入:
{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}
注意:我检查了日期格式,BigQuery 插入允许使用上面的日期格式。
我用您的确切架构和输入尝试了一个示例,它对我有用。我必须进行以下修复。
(1) 似乎您没有在参数中指定项目。您可能在管道定义中指定了它,因为您没有看到与此相关的错误。
(2) 您上面提到的代码中有错字。
'field_5: 'ExampleData'
应该是 'field_5': 'ExampleData'
但我假设这只是这个问题中的一个错字,而不是你原来的管道,因为你没有收到错误。
您运行使用的是最新版本的 Dataflow 吗?您可以尝试创建一个新的虚拟环境并运行 'pip install google-cloud-dataflow' 安装最新版本。
可以分享你的完整pipleine让我试用吗?
由于您使用的是 'DirectPipelineRunner',因此很难远程调试它。是否可以尝试 运行 使用 'DataflowPipelineRunner' 连接相同的管道(请注意,您需要一个为此启用计费的 GCP 项目)?如果您可以 运行 使用 'DataflowPipelineRunner' 并提供作业 ID,我将能够查看日志。
这个答案来得太晚了,但也许会对其他人有所帮助。你在管道中的写语句写错了。
bq_data | 'Write to BigQuery' >>
beam.io.Write(beam.io.BigQuerySink(known_args.output_table,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table
我正在使用 Python (2.7) 并在 Google 的 DataFlow 环境中工作,不用说,Google 还没有完全清除所有内容,并且文档还不够。但是,从 Dataflow 写入 BigQuery 的部分记录在此处 BigQuery Sink。
根据文档,为了指定架构,您需要输入一个字符串:
schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'
table名称、项目ID和数据集ID是这样的:'example_project_id:example_dataset_id.example_table_name'
现在,所有这些都在起作用。请参阅下面的代码,但据我所知,它正在成功创建 table 和字段。注意:项目 ID 设置为函数参数的一部分。
bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
现在,看起来我可以使用这个来插入东西:
bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)
但是由于某些原因,当将值打包到 PCollection 中时,它说它插入到 BigQuery 中,但是当我查询 table 时,它什么也没显示。
为什么没有插入?我没有看到任何错误,但没有向 BigQuery 插入任何内容。
这是 PCollection 中包含的数据的样子,我有将近 1,100 行要插入:
{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}
注意:我检查了日期格式,BigQuery 插入允许使用上面的日期格式。
我用您的确切架构和输入尝试了一个示例,它对我有用。我必须进行以下修复。
(1) 似乎您没有在参数中指定项目。您可能在管道定义中指定了它,因为您没有看到与此相关的错误。
(2) 您上面提到的代码中有错字。
'field_5: 'ExampleData'
应该是 'field_5': 'ExampleData'
但我假设这只是这个问题中的一个错字,而不是你原来的管道,因为你没有收到错误。
您运行使用的是最新版本的 Dataflow 吗?您可以尝试创建一个新的虚拟环境并运行 'pip install google-cloud-dataflow' 安装最新版本。
可以分享你的完整pipleine让我试用吗?
由于您使用的是 'DirectPipelineRunner',因此很难远程调试它。是否可以尝试 运行 使用 'DataflowPipelineRunner' 连接相同的管道(请注意,您需要一个为此启用计费的 GCP 项目)?如果您可以 运行 使用 'DataflowPipelineRunner' 并提供作业 ID,我将能够查看日志。
这个答案来得太晚了,但也许会对其他人有所帮助。你在管道中的写语句写错了。
bq_data | 'Write to BigQuery' >>
beam.io.Write(beam.io.BigQuerySink(known_args.output_table,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table