从 ParDo 函数中写入 BigQuery

Writing to BigQuery from within a ParDo function

我想从 ParDo 函数中调用一个 beam.io.Write(beam.io.BigQuerySink(..)) 操作来为 PCollection 中的每个键生成一个单独的 BigQuery table(我正在使用 python 开发工具包)。这里有两个相似的线程,不幸的是没有帮助:

1)

2)

当我执行以下代码时,第一个键的行被插入到 BigQuery 中,然后管道失败并出现以下错误。非常感谢任何关于我做错了什么的建议或任何关于如何解决它的建议。

管道代码:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))

class par_upload(beam.DoFn):

    def process(self, context):
        key, value = context.element

        ### This block causes issues ###
        value | 'write_to_bq' >> beam.io.Write(
                        beam.io.BigQuerySink(
                            'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
                            schema=schema,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
            )
        ### End block ######
        return [value] 


### Following part works fine ###
filtered = (rows    | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
                    | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
                    | 'group_by_key' >> beam.GroupByKey() 
                    | 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
                    | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
                )

### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            'PROJECT-NAME:analytics.another_table',
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

错误信息:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
  File "split_events.py", line 137, in <module>
    run()
  File "split_events.py", line 132, in run
    p.run()
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
    runner.process(v)
  File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
  File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
  File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
  File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
    return self.run(self.dofn.process, context, args, kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
    result = method(context, *args, **kwargs)
  File "split_events.py", line 73, in process
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
    return _MaterializePValues(cache).visit(result)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
    return self._pvalue_cache.get_unwindowed_pvalue(node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
    return [v.value for v in self.get_pvalue(pvalue)]
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"

编辑(第一个答案后):

我没有意识到我的 需要是 PCollection

我现在已经将我的代码更改为这个(这可能非常低效):

key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))

现在可以在 本地 上正常工作,但不能与 BlockingDataflowPipelineRunner 一起使用 :-(

管道失败并出现以下错误:

    JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
    op.start()
  File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
    def start(self):
  File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)

在类似的线程中,在 ParDo 中执行 BigQuery 写操作的唯一建议是直接使用 BigQuery API,或者使用 client.

您编写的代码将 Dataflow ParDo class beam.io.BigQuerySink() 放入 DoFn 函数中。 ParDo class 期望像工作代码示例中的 filtered 一样在 PCollection 上工作。 value.

上的非功能代码并非如此

我认为最简单的选择是查看 gcloud-python BigQuery 函数 insert_data() 并将其放入您的 ParDo 中。