Apache Beam 有状态 ParDo Work 令牌无效

Apache Beam stateful ParDo Work token invalid

我有一个有状态的 DoFn,它基本上对即将到来的元素进行批处理,当缓冲区达到特定大小时,缓冲区将被清除并将元素插入到 BigQuery 中。我注意到的是,管道有时会引发异常,异常不会将作业停止到 运行。下面是堆栈跟踪:


Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
    self._flush_buffer(buffer_state, count_state, buffer_size_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
    rows = self._extract_rows(buffer_state)
  File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
    for row in buffer.read():
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
    for elem in self.first:
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
    continuation_token=continuation_token)))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid

调用 process 方法并尝试从缓冲区中提取元素时引发此问题,请参阅 rows = self._extract_rows(buffer_state)

DoFn 的实现与示例中的完全相同 https://beam.apache.org/blog/timely-processing/#example-batched-rpc

我已经确认在工作重新分配期间会出现此错误,例如自动缩放时。工作项将在新机器上重试,管道将继续正确处理。 (我同意可以改进错误消息。)