带有 JdbcIO 的 Apache Beam 管道

Apache Beam pipeline with JdbcIO

我有一个 Apache Beam 管道,它试图在从 BigQuery 读取后写入 Postgres。该代码使用 JdbcIO 连接器和 Dataflow 运行ner。 我正在使用 Python 3.8.7 和 Apache Beam 2.28.0

我使用的是默认扩展服务。我也尝试 运行 自定义扩展服务,但仍然遇到同样的错误。有什么想法吗?

代码如下

def export_to_postgres(user_options, pipeline_options, password):
    """Creates a pipeline that writes entities to postgres."""

    TeacherRow = NamedTuple(
        "TeacherRow",
        [
            ("teacher_id", str),
            ("first_name", str),
            ("last_name", str),
            ("total_all_publisher", int)
        ])

    coders.registry.register_coder(TeacherRow, coders.RowCoder)

    p = beam.Pipeline(options=pipeline_options)

    (p
     | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
                query=user_options.query_src,
                use_standard_sql=True
            )
     | beam.Map(lambda x:
                TeacherRow(teacher_id=str(x.teacher_id),
                              first_name=str(x.first_name),
                              last_name=str(x.last_name),
                              total_all_publisher=int(x.total_all_publisher)))
     .with_output_types(TeacherRow)
     | beam.WindowInto(beam.window.FixedWindows(10))
     .with_output_types(TeacherRow)
     | 'Write to jdbc' >> WriteToJdbc(
                table_name="teacher",
                driver_class_name='org.postgresql.Driver',
                jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "your ip address", "5432", "postgres"),
                username="postgres",
                password="password"
            )
     )
    p.run()

我收到以下错误

  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 102, in <module>
    run()
  File "/Users/Trex/workspace/workflow/dataflow/bq-to-pg.py", line 97, in run
    export_to_postgres(user_options, pipeline_options, password)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 58, in export_to_postgres
    p.run()
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1```

这是 https://issues.apache.org/jira/browse/BEAM-12043,希望下一个版本可以修复。