如何在 Python 中使用 apache beam Pipeline 处理异常?

How can I handle an exception using apache beam Pipeline in Python?

给定 Google NLP API returns 当不支持为文本识别的语言时出现错误,我在管道的一次迭代中遇到错误.我在处理管道中的此异常以利用 apache beam 的高效 bach 请求时遇到问题。



features = nlp.types.AnnotateTextRequest.Features(
    extract_entities=True,
    extract_document_sentiment=True,
    extract_syntax=False
)

p = beam.Pipeline()
(p 
 | beam.Create(message_log_df['message'])
 | beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
 | nlp.AnnotateText(features)
 | beam.Map(parse_nlp_result)
 | beam.io.WriteToText('gs://{}/all.txt'.format(BUCKET_NAME), num_shards=1)
)
result = p.run()
result.wait_until_finish()

我需要处理的异常:

_InactiveRpcError                         Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     66         try:
---> 67             return callable_(*args, **kwargs)
     68         except grpc.RpcError as exc:

/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    945                                       wait_for_ready, compression)
--> 946         return _end_unary_response_blocking(state, call, False, None)
    947 

/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
    848     else:
--> 849         raise _InactiveRpcError(state)
    850 

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "The language nl is not supported for entity analysis."
    debug_error_string = "{"created":"@1634485423.983876222","description":"Error received from peer ipv4:172.217.212.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"The language nl is not supported for entity analysis.","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

InvalidArgument                           Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
    136   def process(self, element):
--> 137     response = self.client.annotate_text(
    138         document=Document.to_dict(element),

/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
    575         )
--> 576         return self._inner_api_calls["annotate_text"](
    577             request, retry=retry, timeout=timeout, metadata=metadata

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
    285             )
--> 286             return retry_target(
    287                 target,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
    188         try:
--> 189             return target()
    190 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     68         except grpc.RpcError as exc:
---> 69             six.raise_from(exceptions.from_grpc_error(exc), exc)
     70 

/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)

InvalidArgument: 400 The language nl is not supported for entity analysis.

During handling of the above exception, another exception occurred:

InvalidArgument                           Traceback (most recent call last)
<ipython-input-121-8497e34517c1> in <module>
      7  | beam.io.WriteToText('gs://{}/whatsapp-all.txt'.format(BUCKET_NAME), num_shards=1)
      8 )
----> 9 result = p.run()
     10 result.wait_until_finish()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    563         finally:
    564           shutil.rmtree(tmpdir)
--> 565       return self.runner.run_pipeline(self, self._options)
    566     finally:
    567       shutil.rmtree(self.local_tempdir, ignore_errors=True)

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    193         options.view_as(pipeline_options.ProfilingOptions))
    194 
--> 195     self._latest_run_result = self.run_via_runner_api(
    196         pipeline.to_runner_api(default_environment=self._default_environment))
    197     return self._latest_run_result

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    204     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    205     #   the teststream (if any), and all the stages).
--> 206     return self.run_stages(stage_context, stages)
    207 
    208   @contextlib.contextmanager

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    382           )
    383 
--> 384           stage_results = self._run_stage(
    385               runner_execution_context, bundle_context_manager)
    386 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    644     while True:
    645       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 646           self._run_bundle(
    647               runner_execution_context,
    648               bundle_context_manager,

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    767         expected_timer_output)
    768 
--> 769     result, splits = bundle_manager.process_bundle(
    770         data_input, data_output, input_timers, expected_timer_output)
    771     # Now we collect all the deferred inputs remaining from bundle execution.

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1078             process_bundle_descriptor.id,
   1079             cache_tokens=[next(self._cache_token_generator)]))
-> 1080     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1081 
   1082     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    376       self._uid_counter += 1
    377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
    379     return ControlFuture(request.instruction_id, response)
    380 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    600     if request_type:
    601       # E.g. if register is set, this will call self.register(request.register))
--> 602       return getattr(self, request_type)(
    603           getattr(request, request_type), request.instruction_id)
    604     else:

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    638         with self.maybe_profile(instruction_id):
    639           delayed_applications, requests_finalization = (
--> 640               bundle_processor.process_bundle(instruction_id))
    641           monitoring_infos = bundle_processor.monitoring_infos()
    642           monitoring_infos.extend(self.state_cache_metrics_fn())

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    994                   element.timer_family_id, timer_data)
    995           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 996             input_op_by_transform_id[element.transform_id].process_encoded(
    997                 element.data)
    998 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    220       decoded_value = self.windowed_coder_impl.decode_from_stream(
    221           input_stream, True)
--> 222       self.output(decoded_value)
    223 
    224   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
    135 
    136   def process(self, element):
--> 137     response = self.client.annotate_text(
    138         document=Document.to_dict(element),
    139         features=self.features,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
    574             document=document, features=features, encoding_type=encoding_type
    575         )
--> 576         return self._inner_api_calls["annotate_text"](
    577             request, retry=retry, timeout=timeout, metadata=metadata
    578         )

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
    143             kwargs["metadata"] = metadata
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 
    147 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
    284                 self._initial, self._maximum, multiplier=self._multiplier
    285             )
--> 286             return retry_target(
    287                 target,
    288                 self._predicate,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
    187     for sleep in sleep_generator:
    188         try:
--> 189             return target()
    190 
    191         # pylint: disable=broad-except

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     67             return callable_(*args, **kwargs)
     68         except grpc.RpcError as exc:
---> 69             six.raise_from(exceptions.from_grpc_error(exc), exc)
     70 
     71     return error_remapped_callable

/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)

InvalidArgument: 400 The language nl is not supported for entity analysis. [while running '[121]: AnnotateText(extract_entities: true
extract_document_sentiment: true
)/ParDo(_AnnotateTextFn)']

我使用的教程参考:https://medium.com/google-cloud/calling-google-cloud-machine-learning-apis-from-batch-and-stream-etl-pipelines-9a789ac6f972

我通过在 Apache Beam 源代码上检查来自 this file 的 GCP API 调用的某些方面找到了解决方法。因此,抽象没有任何功能来处理来自 API 的错误,并使整个管道失败。

一个简单的解决方法是覆盖一些键 class/functions:处理 API 调用的 _AnnotateTextFn 对象和使用 _AnnotateTextFn 的 AnnotateText 函数(让它使用我们处理API例外)

class _AnnotateTextFn_Custom(nlp._AnnotateTextFn):

    def __init__(
      self,
      features,  # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
      timeout,  # type: Optional[float]
      metadata=None  # type: Optional[Sequence[Tuple[str, str]]]
      ):
        super(_AnnotateTextFn_Safe, self).__init__(features, timeout, metadata)

    def process(self, element):
        try:
            response = self.client.annotate_text(
                document=nlp.Document.to_dict(element),
                features=self.features,
                encoding_type=element.encoding,
                timeout=self.timeout,
                metadata=self.metadata)
        except Exception as e:
            # ****Handle the exception here****
            response = nlp.types.AnnotateTextResponse()

        self.api_calls.inc()
        
        yield response

@beam.ptransform_fn
def AnnotateText(
    pcoll,  # type: beam.pvalue.PCollection
    features,  # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
    timeout=None,  # type: Optional[float]
    metadata=None  # type: Optional[Sequence[Tuple[str, str]]]
):
  """A :class:`~apache_beam.transforms.ptransform.PTransform`
  for annotating text using the Google Cloud Natural Language API:
  https://cloud.google.com/natural-language/docs.
  Args:
    pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
      :class:`Document` objects.
    features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
      A dictionary of natural language operations to be performed on given
      text in the following format::
      {'extact_syntax'=True, 'extract_entities'=True}
    timeout (`Optional[float]`): The amount of time, in seconds, to wait
      for the request to complete. The timeout applies to each individual
      retry attempt.
    metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
      that is provided to the method.
  """
  return pcoll | beam.ParDo(_AnnotateTextFn_Custom(features, timeout, metadata)) # ***Here, we use our custom object _AnnotateTextFn_Custom

然后我们可以在管道上使用这个新的 AnnotateText