在 ParDo 上返回我自己的 class 之一时,数据流管道引发 PicklingError

Dataflow pipeline raise PicklingError when returning one of my own class on ParDo

我有一个管道如下:

import base64
import gzip
import logging
import apache_beam as beam

import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError

class GetClearMessage(beam.DoFn):
    def process(self, element, **kwargs):
        """ Parse encoded proto 
        Returns an instance of EntryPoint decoded.
        """
        logging.info('Unserializing data')
        logging.info(element)
        batch_entry_point = common_pb2.BatchEntryPoint()
        data = element.data
        logging.info(data)
        try:
            batch_entry_point.ParseFromString(data)
        except DecodeError:
            unziped = gzip.decompress(data)
            batch_entry_point.ParseFromString(unziped)
        logging.info(batch_entry_point)
        return [batch_entry_point]

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='projects/production-213911/subscriptions/ps-to-bq-console',
        with_attributes=True)
    )

    clear_message = console_message | beam.ParDo(GetClearMessage())
    gcloud_id = console_message | beam.ParDo(GetGcloudId())
    registry = console_message | beam.ParDo(GetTableData())
    #clear_message | beam.ParDo(Test())

我删除了一些 classes 因为没有必要理解这个问题。

当我 运行 我的数据流管道时,我经常收到此错误:

Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-4468']

查看下面的完整堆栈跟踪。

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed

During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
        response = task()
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
        lambda: self.create_worker().do_instruction(request), request)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
        getattr(request, request_type), request.instruction_id)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
        bundle_processor.process_bundle(instruction_id))
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
        element.data)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
        self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
        raise exc.with_traceback(traceback)
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
      File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
      File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
      File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
      File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
      File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
        lambda x: dumps(x, protocol), pickle.loads)
    _pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-314']

但是正如您在 GetClearMessage 中看到的那样,我记录了一些数据,当我查看我的日志以了解此特定步骤时,当我记录 batch_entry_point 时,一切似乎都很好,这是 BatchEntryPoint 的一个实例,class惹麻烦。

知道是什么导致了这种行为吗?


编辑

我试图在 ParDo(GetClearMessage) 的结果上添加一个步骤,但从未达到此步骤。所以我猜 pickling 错误是因为我想 return BatchEntryPoint 的一个实例。

我不明白这种行为,你知道如何解决吗?
谢谢

我没有解决这个问题,但我找到了一个解决方法,不返回 batch_entry_point 但其中的每个元素都是这样的:

        for i in batch_entry_point.entrypoints:
        logging.info(i)
        obj['proto'] = i
        yield obj

然后,管道的下一步处理每个元素