Apache Beam 有状态 ParDo Work 令牌无效
Apache Beam stateful ParDo Work token invalid
我有一个有状态的 DoFn,它基本上对即将到来的元素进行批处理,当缓冲区达到特定大小时,缓冲区将被清除并将元素插入到 BigQuery 中。我注意到的是,管道有时会引发异常,异常不会将作业停止到 运行。下面是堆栈跟踪:
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
self._flush_buffer(buffer_state, count_state, buffer_size_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
rows = self._extract_rows(buffer_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
for row in buffer.read():
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
for elem in self.first:
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
self._underlying.get_raw(state_key, continuation_token))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
continuation_token=continuation_token)))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid
调用 process 方法并尝试从缓冲区中提取元素时引发此问题,请参阅 rows = self._extract_rows(buffer_state)
DoFn 的实现与示例中的完全相同 https://beam.apache.org/blog/timely-processing/#example-batched-rpc
我已经确认在工作重新分配期间会出现此错误,例如自动缩放时。工作项将在新机器上重试,管道将继续正确处理。 (我同意可以改进错误消息。)
我有一个有状态的 DoFn,它基本上对即将到来的元素进行批处理,当缓冲区达到特定大小时,缓冲区将被清除并将元素插入到 BigQuery 中。我注意到的是,管道有时会引发异常,异常不会将作业停止到 运行。下面是堆栈跟踪:
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 140, in process
self._flush_buffer(buffer_state, count_state, buffer_size_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 162, in _flush_buffer
rows = self._extract_rows(buffer_state)
File "/usr/local/lib/python3.7/site-packages/gp/pipelines/common/writer_transforms.py", line 197, in _extract_rows
for row in buffer.read():
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 510, in __iter__
for elem in self.first:
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in _lazy_iterator
self._underlying.get_raw(state_key, continuation_token))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 846, in get_raw
continuation_token=continuation_token)))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 886, in _blocking_request
raise RuntimeError(response.error)
RuntimeError: INTERNAL: Work token invalid
调用 process 方法并尝试从缓冲区中提取元素时引发此问题,请参阅 rows = self._extract_rows(buffer_state)
DoFn 的实现与示例中的完全相同 https://beam.apache.org/blog/timely-processing/#example-batched-rpc
我已经确认在工作重新分配期间会出现此错误,例如自动缩放时。工作项将在新机器上重试,管道将继续正确处理。 (我同意可以改进错误消息。)