Python Bigtable 数据流 - 无法 pickle <class 'Mutation'>
Python Bigtable Dataflow - Can't pickle <class 'Mutation'>
我正在使用 Apache Beam 编写数据流管道,以将大批量数据行添加到 Bigtable。
apache-beam==2.24.0
google-cloud-bigtable==2.4.0
我在管道中使用以下方法在写入 Bigtable 之前创建 Bigtable 行:
class CreateBigtableRow(beam.DoFn):
def __init__(self, settings):
self.column_family = settings["bigtable_column_family"]
super(CreateBigtableRow, self).__init__()
def process(self, usage_data, *args, **kwargs):
row_key = BigTable.generate_row_key(usage_data, key_order)
return [
BigTable.create_row_and_assign_values(
row_key, usage_data, self.column_family
)
]
其中 `create_row_and_assign_values 定义为:
def create_row_and_assign_values(
cls, key: str, row: dict, column_family: str
) -> DirectRow:
table_row = DirectRow(key.encode())
for key, val in row.items():
if isinstance(val, float):
val = struct.pack(">d", val)
table_row.set_cell(column_family, key.encode(), val)
return table_row
我的流水线如下:
with beam.Pipeline(options=pipeline_options) as pipe:
(
pipe
| beam.Create(["/sample_files/*combined*"]) # reads sample csv file
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.FlatMap(
lambda file: csv.DictReader(open(file.metadata.path))
)
| "Transform to Usage dict" >> beam.ParDo(TransformToBigtableData())
| "Create Bigtable Row" >> beam.ParDo(CreateBigtableRow(bigtable_settings))
| WriteToBigTable(
project_id=bigtable_settings["bigtable_project"],
instance_id=bigtable_settings["bigtable_instance"],
table_id=bigtable_settings["bigtable_table"])
)
我遇到的问题是出现错误
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']
当运行管道。我已经添加了使用 google-cloud-bigtable
库的 Bigtable Client
手动批处理记录的步骤,但我更愿意使用内置的 WriteToBigTable
方法,因为它会为我处理所有事情。
完整堆栈跟踪:
Traceback (most recent call last):
File "/app/src/ingest/main.py", line 226, in <module>
run(
File "/app/src/ingest/main.py", line 149, in run
(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 596, in __exit__
self.result = self.run()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 546, in run
return Pipeline.from_runner_api(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 573, in run
return self.runner.run_pipeline(self, self._options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 384, in run_stages
stage_results = self._run_stage(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 646, in _run_stage
self._run_bundle(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 769, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
response = self.worker.do_instruction(request)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 995, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
self.output(decoded_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 354, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1316, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 215, in receive
self.update_counters_start(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 179, in update_counters_start
self.opcounter.update_from(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 211, in update_from
self.do_sample(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 250, in do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 1371, in get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 358, in get_estimated_size_and_observables
self.encode_to_stream(value, out, nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 422, in encode_to_stream
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 262, in encode_to_stream
return stream.write(self._encoder(value), nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 800, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']
你的google-cloud-bigtable版本太高了。
在更新 apache-beam 依赖项方面有一些进展here
他们有同样的问题。你能把你的 bigtable 版本回滚到 2 之前的版本吗?如果你 运行 这个:
pip install apache-beam[gcp]
它将安装推荐的版本。
我正在使用 Apache Beam 编写数据流管道,以将大批量数据行添加到 Bigtable。
apache-beam==2.24.0
google-cloud-bigtable==2.4.0
我在管道中使用以下方法在写入 Bigtable 之前创建 Bigtable 行:
class CreateBigtableRow(beam.DoFn):
def __init__(self, settings):
self.column_family = settings["bigtable_column_family"]
super(CreateBigtableRow, self).__init__()
def process(self, usage_data, *args, **kwargs):
row_key = BigTable.generate_row_key(usage_data, key_order)
return [
BigTable.create_row_and_assign_values(
row_key, usage_data, self.column_family
)
]
其中 `create_row_and_assign_values 定义为:
def create_row_and_assign_values(
cls, key: str, row: dict, column_family: str
) -> DirectRow:
table_row = DirectRow(key.encode())
for key, val in row.items():
if isinstance(val, float):
val = struct.pack(">d", val)
table_row.set_cell(column_family, key.encode(), val)
return table_row
我的流水线如下:
with beam.Pipeline(options=pipeline_options) as pipe:
(
pipe
| beam.Create(["/sample_files/*combined*"]) # reads sample csv file
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.FlatMap(
lambda file: csv.DictReader(open(file.metadata.path))
)
| "Transform to Usage dict" >> beam.ParDo(TransformToBigtableData())
| "Create Bigtable Row" >> beam.ParDo(CreateBigtableRow(bigtable_settings))
| WriteToBigTable(
project_id=bigtable_settings["bigtable_project"],
instance_id=bigtable_settings["bigtable_instance"],
table_id=bigtable_settings["bigtable_table"])
)
我遇到的问题是出现错误
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']
当运行管道。我已经添加了使用 google-cloud-bigtable
库的 Bigtable Client
手动批处理记录的步骤,但我更愿意使用内置的 WriteToBigTable
方法,因为它会为我处理所有事情。
完整堆栈跟踪:
Traceback (most recent call last):
File "/app/src/ingest/main.py", line 226, in <module>
run(
File "/app/src/ingest/main.py", line 149, in run
(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 596, in __exit__
self.result = self.run()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 546, in run
return Pipeline.from_runner_api(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/pipeline.py", line 573, in run
return self.runner.run_pipeline(self, self._options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 384, in run_stages
stage_results = self._run_stage(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 646, in _run_stage
self._run_bundle(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 769, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
response = self.worker.do_instruction(request)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 995, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
self.output(decoded_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 354, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 216, in receive
self.consumer.process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 714, in process
delayed_applications = self.dofn_runner.process(o)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1235, in process
self._reraise_augmented(exn)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1316, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1233, in process
return self.do_fn_invoker.invoke_process(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 571, in invoke_process
self.output_processor.process_outputs(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1396, in process_outputs
self.main_receivers.receive(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 215, in receive
self.update_counters_start(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/operations.py", line 179, in update_counters_start
self.opcounter.update_from(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 211, in update_from
self.do_sample(windowed_value)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/runners/worker/opcounters.py", line 250, in do_sample
self.coder_impl.get_estimated_size_and_observables(windowed_value))
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 1371, in get_estimated_size_and_observables
self._value_coder.get_estimated_size_and_observables(
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 358, in get_estimated_size_and_observables
self.encode_to_stream(value, out, nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 422, in encode_to_stream
self.fallback_coder_impl.encode_to_stream(value, stream, nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coder_impl.py", line 262, in encode_to_stream
return stream.write(self._encoder(value), nested)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 800, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Create Bigtable Row']
你的google-cloud-bigtable版本太高了。
在更新 apache-beam 依赖项方面有一些进展here
他们有同样的问题。你能把你的 bigtable 版本回滚到 2 之前的版本吗?如果你 运行 这个:
pip install apache-beam[gcp]
它将安装推荐的版本。