为什么我的 Python BigQuery Dataflow 接收器没有将记录插入数据库?

Why is my Python BigQuery Dataflow sink not inserting records into the database?

我正在使用 Python (2.7) 并在 Google 的 DataFlow 环境中工作,不用说,Google 还没有完全清除所有内容,并且文档还不够。但是,从 Dataflow 写入 BigQuery 的部分记录在此处 BigQuery Sink

根据文档,为了指定架构,您需要输入一个字符串:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'

table名称、项目ID和数据集ID是这样的:'example_project_id:example_dataset_id.example_table_name'

现在,所有这些都在起作用。请参阅下面的代码,但据我所知,它正在成功创建 table 和字段。注意:项目 ID 设置为函数参数的一部分。

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
        'example_dataset_id.{}'.format(bq_table_name),
        schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )
)

现在,看起来我可以使用这个来插入东西:

bq_data = pipeline | beam.Create(
    [{
        'field_1': 'ExampleIdentifier',
        'field_2': 'ExampleValue',
        'field_3': 'ExampleFieldValue',
        'created_at': '2016-12-26T05:50:39Z',
        'updated_at': '2016-12-26T05:50:39Z',
        'field_4': 'ExampleDataIdentifier',
        'field_5: 'ExampleData'
    }]
)

但是由于某些原因,当将值打包到 PCollection 中时,它说它插入到 BigQuery 中,但是当我查询 table 时,它什么也没显示。

为什么没有插入?我没有看到任何错误,但没有向 BigQuery 插入任何内容。

这是 PCollection 中包含的数据的样子,我有将近 1,100 行要插入:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}

注意:我检查了日期格式,BigQuery 插入允许使用上面的日期格式。

我用您的确切架构和输入尝试了一个示例,它对我有用。我必须进行以下修复。

(1) 似乎您没有在参数中指定项目。您可能在管道定义中指定了它,因为您没有看到与此相关的错误。 (2) 您上面提到的代码中有错字。 'field_5: 'ExampleData' 应该是 'field_5': 'ExampleData' 但我假设这只是这个问题中的一个错字,而不是你原来的管道,因为你没有收到错误。

您运行使用的是最新版本的 Dataflow 吗?您可以尝试创建一个新的虚拟环境并运行 'pip install google-cloud-dataflow' 安装最新版本。

可以分享你的完整pipleine让我试用吗?

由于您使用的是 'DirectPipelineRunner',因此很难远程调试它。是否可以尝试 运行 使用 'DataflowPipelineRunner' 连接相同的管道(请注意,您需要一个为此启用计费的 GCP 项目)?如果您可以 运行 使用 'DataflowPipelineRunner' 并提供作业 ID,我将能够查看日志。

这个答案来得太晚了,但也许会对其他人有所帮助。你在管道中的写语句写错了。

bq_data | 'Write to BigQuery' >> 
    beam.io.Write(beam.io.BigQuerySink(known_args.output_table, 
    schema=schema, 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) # This is overwrite whatever you have in your table