如何在 Python 中使用 apache beam Pipeline 处理异常?
How can I handle an exception using apache beam Pipeline in Python?
给定 Google NLP API returns 当不支持为文本识别的语言时出现错误,我在管道的一次迭代中遇到错误.我在处理管道中的此异常以利用 apache beam 的高效 bach 请求时遇到问题。
features = nlp.types.AnnotateTextRequest.Features(
extract_entities=True,
extract_document_sentiment=True,
extract_syntax=False
)
p = beam.Pipeline()
(p
| beam.Create(message_log_df['message'])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features)
| beam.Map(parse_nlp_result)
| beam.io.WriteToText('gs://{}/all.txt'.format(BUCKET_NAME), num_shards=1)
)
result = p.run()
result.wait_until_finish()
我需要处理的异常:
_InactiveRpcError Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
66 try:
---> 67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
947
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
848 else:
--> 849 raise _InactiveRpcError(state)
850
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "The language nl is not supported for entity analysis."
debug_error_string = "{"created":"@1634485423.983876222","description":"Error received from peer ipv4:172.217.212.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"The language nl is not supported for entity analysis.","grpc_status":3}"
>
The above exception was the direct cause of the following exception:
InvalidArgument Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
144
--> 145 return wrapped_func(*args, **kwargs)
146
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
285 )
--> 286 return retry_target(
287 target,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
188 try:
--> 189 return target()
190
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis.
During handling of the above exception, another exception occurred:
InvalidArgument Traceback (most recent call last)
<ipython-input-121-8497e34517c1> in <module>
7 | beam.io.WriteToText('gs://{}/whatsapp-all.txt'.format(BUCKET_NAME), num_shards=1)
8 )
----> 9 result = p.run()
10 result.wait_until_finish()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
563 finally:
564 shutil.rmtree(tmpdir)
--> 565 return self.runner.run_pipeline(self, self._options)
566 finally:
567 shutil.rmtree(self.local_tempdir, ignore_errors=True)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
193 options.view_as(pipeline_options.ProfilingOptions))
194
--> 195 self._latest_run_result = self.run_via_runner_api(
196 pipeline.to_runner_api(default_environment=self._default_environment))
197 return self._latest_run_result
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
204 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
205 # the teststream (if any), and all the stages).
--> 206 return self.run_stages(stage_context, stages)
207
208 @contextlib.contextmanager
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
382 )
383
--> 384 stage_results = self._run_stage(
385 runner_execution_context, bundle_context_manager)
386
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
644 while True:
645 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 646 self._run_bundle(
647 runner_execution_context,
648 bundle_context_manager,
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
767 expected_timer_output)
768
--> 769 result, splits = bundle_manager.process_bundle(
770 data_input, data_output, input_timers, expected_timer_output)
771 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1078 process_bundle_descriptor.id,
1079 cache_tokens=[next(self._cache_token_generator)]))
-> 1080 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1081
1082 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
600 if request_type:
601 # E.g. if register is set, this will call self.register(request.register))
--> 602 return getattr(self, request_type)(
603 getattr(request, request_type), request.instruction_id)
604 else:
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
638 with self.maybe_profile(instruction_id):
639 delayed_applications, requests_finalization = (
--> 640 bundle_processor.process_bundle(instruction_id))
641 monitoring_infos = bundle_processor.monitoring_infos()
642 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
994 element.timer_family_id, timer_data)
995 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 996 input_op_by_transform_id[element.transform_id].process_encoded(
997 element.data)
998
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
220 decoded_value = self.windowed_coder_impl.decode_from_stream(
221 input_stream, True)
--> 222 self.output(decoded_value)
223
224 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
135
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
139 features=self.features,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
574 document=document, features=features, encoding_type=encoding_type
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
578 )
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
143 kwargs["metadata"] = metadata
144
--> 145 return wrapped_func(*args, **kwargs)
146
147
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
284 self._initial, self._maximum, multiplier=self._multiplier
285 )
--> 286 return retry_target(
287 target,
288 self._predicate,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
187 for sleep in sleep_generator:
188 try:
--> 189 return target()
190
191 # pylint: disable=broad-except
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
71 return error_remapped_callable
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis. [while running '[121]: AnnotateText(extract_entities: true
extract_document_sentiment: true
)/ParDo(_AnnotateTextFn)']
我通过在 Apache Beam 源代码上检查来自 this file 的 GCP API 调用的某些方面找到了解决方法。因此,抽象没有任何功能来处理来自 API 的错误,并使整个管道失败。
一个简单的解决方法是覆盖一些键 class/functions:处理 API 调用的 _AnnotateTextFn 对象和使用 _AnnotateTextFn 的 AnnotateText 函数(让它使用我们处理API例外)
class _AnnotateTextFn_Custom(nlp._AnnotateTextFn):
def __init__(
self,
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
super(_AnnotateTextFn_Safe, self).__init__(features, timeout, metadata)
def process(self, element):
try:
response = self.client.annotate_text(
document=nlp.Document.to_dict(element),
features=self.features,
encoding_type=element.encoding,
timeout=self.timeout,
metadata=self.metadata)
except Exception as e:
# ****Handle the exception here****
response = nlp.types.AnnotateTextResponse()
self.api_calls.inc()
yield response
@beam.ptransform_fn
def AnnotateText(
pcoll, # type: beam.pvalue.PCollection
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout=None, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
"""A :class:`~apache_beam.transforms.ptransform.PTransform`
for annotating text using the Google Cloud Natural Language API:
https://cloud.google.com/natural-language/docs.
Args:
pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
:class:`Document` objects.
features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
A dictionary of natural language operations to be performed on given
text in the following format::
{'extact_syntax'=True, 'extract_entities'=True}
timeout (`Optional[float]`): The amount of time, in seconds, to wait
for the request to complete. The timeout applies to each individual
retry attempt.
metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
that is provided to the method.
"""
return pcoll | beam.ParDo(_AnnotateTextFn_Custom(features, timeout, metadata)) # ***Here, we use our custom object _AnnotateTextFn_Custom
然后我们可以在管道上使用这个新的 AnnotateText
给定 Google NLP API returns 当不支持为文本识别的语言时出现错误,我在管道的一次迭代中遇到错误.我在处理管道中的此异常以利用 apache beam 的高效 bach 请求时遇到问题。
features = nlp.types.AnnotateTextRequest.Features(
extract_entities=True,
extract_document_sentiment=True,
extract_syntax=False
)
p = beam.Pipeline()
(p
| beam.Create(message_log_df['message'])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features)
| beam.Map(parse_nlp_result)
| beam.io.WriteToText('gs://{}/all.txt'.format(BUCKET_NAME), num_shards=1)
)
result = p.run()
result.wait_until_finish()
我需要处理的异常:
_InactiveRpcError Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
66 try:
---> 67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
947
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
848 else:
--> 849 raise _InactiveRpcError(state)
850
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "The language nl is not supported for entity analysis."
debug_error_string = "{"created":"@1634485423.983876222","description":"Error received from peer ipv4:172.217.212.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"The language nl is not supported for entity analysis.","grpc_status":3}"
>
The above exception was the direct cause of the following exception:
InvalidArgument Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
144
--> 145 return wrapped_func(*args, **kwargs)
146
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
285 )
--> 286 return retry_target(
287 target,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
188 try:
--> 189 return target()
190
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis.
During handling of the above exception, another exception occurred:
InvalidArgument Traceback (most recent call last)
<ipython-input-121-8497e34517c1> in <module>
7 | beam.io.WriteToText('gs://{}/whatsapp-all.txt'.format(BUCKET_NAME), num_shards=1)
8 )
----> 9 result = p.run()
10 result.wait_until_finish()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
563 finally:
564 shutil.rmtree(tmpdir)
--> 565 return self.runner.run_pipeline(self, self._options)
566 finally:
567 shutil.rmtree(self.local_tempdir, ignore_errors=True)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
193 options.view_as(pipeline_options.ProfilingOptions))
194
--> 195 self._latest_run_result = self.run_via_runner_api(
196 pipeline.to_runner_api(default_environment=self._default_environment))
197 return self._latest_run_result
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
204 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
205 # the teststream (if any), and all the stages).
--> 206 return self.run_stages(stage_context, stages)
207
208 @contextlib.contextmanager
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
382 )
383
--> 384 stage_results = self._run_stage(
385 runner_execution_context, bundle_context_manager)
386
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
644 while True:
645 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 646 self._run_bundle(
647 runner_execution_context,
648 bundle_context_manager,
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
767 expected_timer_output)
768
--> 769 result, splits = bundle_manager.process_bundle(
770 data_input, data_output, input_timers, expected_timer_output)
771 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1078 process_bundle_descriptor.id,
1079 cache_tokens=[next(self._cache_token_generator)]))
-> 1080 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1081
1082 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
600 if request_type:
601 # E.g. if register is set, this will call self.register(request.register))
--> 602 return getattr(self, request_type)(
603 getattr(request, request_type), request.instruction_id)
604 else:
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
638 with self.maybe_profile(instruction_id):
639 delayed_applications, requests_finalization = (
--> 640 bundle_processor.process_bundle(instruction_id))
641 monitoring_infos = bundle_processor.monitoring_infos()
642 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
994 element.timer_family_id, timer_data)
995 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 996 input_op_by_transform_id[element.transform_id].process_encoded(
997 element.data)
998
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
220 decoded_value = self.windowed_coder_impl.decode_from_stream(
221 input_stream, True)
--> 222 self.output(decoded_value)
223
224 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
135
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
139 features=self.features,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
574 document=document, features=features, encoding_type=encoding_type
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
578 )
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
143 kwargs["metadata"] = metadata
144
--> 145 return wrapped_func(*args, **kwargs)
146
147
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
284 self._initial, self._maximum, multiplier=self._multiplier
285 )
--> 286 return retry_target(
287 target,
288 self._predicate,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
187 for sleep in sleep_generator:
188 try:
--> 189 return target()
190
191 # pylint: disable=broad-except
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
71 return error_remapped_callable
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis. [while running '[121]: AnnotateText(extract_entities: true
extract_document_sentiment: true
)/ParDo(_AnnotateTextFn)']
我通过在 Apache Beam 源代码上检查来自 this file 的 GCP API 调用的某些方面找到了解决方法。因此,抽象没有任何功能来处理来自 API 的错误,并使整个管道失败。
一个简单的解决方法是覆盖一些键 class/functions:处理 API 调用的 _AnnotateTextFn 对象和使用 _AnnotateTextFn 的 AnnotateText 函数(让它使用我们处理API例外)
class _AnnotateTextFn_Custom(nlp._AnnotateTextFn):
def __init__(
self,
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
super(_AnnotateTextFn_Safe, self).__init__(features, timeout, metadata)
def process(self, element):
try:
response = self.client.annotate_text(
document=nlp.Document.to_dict(element),
features=self.features,
encoding_type=element.encoding,
timeout=self.timeout,
metadata=self.metadata)
except Exception as e:
# ****Handle the exception here****
response = nlp.types.AnnotateTextResponse()
self.api_calls.inc()
yield response
@beam.ptransform_fn
def AnnotateText(
pcoll, # type: beam.pvalue.PCollection
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout=None, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
"""A :class:`~apache_beam.transforms.ptransform.PTransform`
for annotating text using the Google Cloud Natural Language API:
https://cloud.google.com/natural-language/docs.
Args:
pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
:class:`Document` objects.
features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
A dictionary of natural language operations to be performed on given
text in the following format::
{'extact_syntax'=True, 'extract_entities'=True}
timeout (`Optional[float]`): The amount of time, in seconds, to wait
for the request to complete. The timeout applies to each individual
retry attempt.
metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
that is provided to the method.
"""
return pcoll | beam.ParDo(_AnnotateTextFn_Custom(features, timeout, metadata)) # ***Here, we use our custom object _AnnotateTextFn_Custom
然后我们可以在管道上使用这个新的 AnnotateText