无法在 Airflow Python 3 中发布 Pubsub 消息
Unable to publish Pubsub message in Airflow Python 3
我无法使用 Airflow 中的 PubSubHook
和 Python 3 进行发布。一切都与 Python 2 完美配合,但是 Python 3 我收到此错误{models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
。似乎在 Python 3 中对消息进行编码会导致 JSON 序列化程序无法处理该字节。
以下在 Python 2 中工作正常:
def send_message_to_pubsub(message):
pubsub_message = {'data': b64encode(message)}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
示例 here 不适用于 Python 3.
更新 1:
尝试了以下但出现错误:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message).decode()}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test [2019-03-18 17:10:28,903] {models.py:1760} ERROR - a bytes-like object is required, not 'str'
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test Traceback (most recent call last):
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test result = task_copy.execute(context=context)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return_value = self.execute_callable()
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 31, in send_message_to_pubsub
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test pubsub_message = {'data': b64encode(message).decode()}
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/base64.py", line 58, in b64encode
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test encoded = binascii.b2a_base64(s, newline=False)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test TypeError: a bytes-like object is required, not 'str'
更新二:
尝试了以下,导致了不同的错误。这次来自 JSON 序列化程序:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message.encode())}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,845] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test [2019-03-19 10:44:29,841] {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test Traceback (most recent call last):
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test result = task_copy.execute(context=context)
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return_value = self.execute_callable()
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 33, in send_message_to_pubsub
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_pubsub_hook.py", line 75, in publish
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test topic=full_topic, body=body)
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/discovery.py", line 795, in method
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test actual_path_params, actual_query_params, body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 151, in request
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test body_value = self.serialize(body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 260, in serialize
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return json.dumps(body_value)
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _default_encoder.encode(obj)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test chunks = self.iterencode(o, _one_shot=True)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
[2019-03-19 10:44:29,854] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _iterencode(o, 0)
[2019-03-19 10:44:29,852] {models.py:1791} INFO - Marking task as FAILED.
这个问题有两个方面。
- 根据 base64 documentation,您的消息必须是
bytes
类型,而不是 str
。要验证这一点,请尝试 assert isinstance(message, bytes)
。这会出错。
解决方案取决于您的消息来自何处。
- 如果您的消息是字符串,您应该在发送到 base64 之前将其编码为字节:
b64encode(message.encode())
- 如果您的消息应为
bytes
类型,您应该将其阅读方式更改为 Python。
- 根据 Python 中的 JSON module documentation,不支持
byte
类型。它们必须是 str
类型。这意味着您发送到 PubSub API 的任何内容都必须是一个字符串。所以你可以把它解码成这样的字符串:
pubsub_message = {'data': b64encode(message.encode()).decode()}
我无法使用 Airflow 中的 PubSubHook
和 Python 3 进行发布。一切都与 Python 2 完美配合,但是 Python 3 我收到此错误{models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
。似乎在 Python 3 中对消息进行编码会导致 JSON 序列化程序无法处理该字节。
以下在 Python 2 中工作正常:
def send_message_to_pubsub(message):
pubsub_message = {'data': b64encode(message)}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
示例 here 不适用于 Python 3.
更新 1:
尝试了以下但出现错误:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message).decode()}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project-name', 'topic-name', [pubsub_message])
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test [2019-03-18 17:10:28,903] {models.py:1760} ERROR - a bytes-like object is required, not 'str'
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test Traceback (most recent call last):
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test result = task_copy.execute(context=context)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return_value = self.execute_callable()
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 31, in send_message_to_pubsub
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test pubsub_message = {'data': b64encode(message).decode()}
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/base64.py", line 58, in b64encode
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test encoded = binascii.b2a_base64(s, newline=False)
{base_task_runner.py:101} INFO - Job 1962: Subtask pub_sub_test TypeError: a bytes-like object is required, not 'str'
更新二:
尝试了以下,导致了不同的错误。这次来自 JSON 序列化程序:
def send_message_to_pubsub():
message = 'Test message'
pubsub_message = {'data': b64encode(message.encode())}
hook = PubSubHook(gcp_conn_id='google_cloud_default')
hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,845] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test [2019-03-19 10:44:29,841] {models.py:1760} ERROR - Object of type 'bytes' is not JSON serializable
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test Traceback (most recent call last):
[2019-03-19 10:44:29,846] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test result = task_copy.execute(context=context)
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return_value = self.execute_callable()
[2019-03-19 10:44:29,847] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 33, in send_message_to_pubsub
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,848] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_pubsub_hook.py", line 75, in publish
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test topic=full_topic, body=body)
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/discovery.py", line 795, in method
[2019-03-19 10:44:29,849] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test actual_path_params, actual_query_params, body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 151, in request
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test body_value = self.serialize(body_value)
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 260, in serialize
[2019-03-19 10:44:29,850] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return json.dumps(body_value)
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
[2019-03-19 10:44:29,851] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _default_encoder.encode(obj)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test chunks = self.iterencode(o, _one_shot=True)
[2019-03-19 10:44:29,853] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
[2019-03-19 10:44:29,854] {base_task_runner.py:101} INFO - Job 2172: Subtask pub_sub_test return _iterencode(o, 0)
[2019-03-19 10:44:29,852] {models.py:1791} INFO - Marking task as FAILED.
这个问题有两个方面。
- 根据 base64 documentation,您的消息必须是
bytes
类型,而不是str
。要验证这一点,请尝试assert isinstance(message, bytes)
。这会出错。
解决方案取决于您的消息来自何处。
- 如果您的消息是字符串,您应该在发送到 base64 之前将其编码为字节:
b64encode(message.encode())
- 如果您的消息应为
bytes
类型,您应该将其阅读方式更改为 Python。
- 根据 Python 中的 JSON module documentation,不支持
byte
类型。它们必须是str
类型。这意味着您发送到 PubSub API 的任何内容都必须是一个字符串。所以你可以把它解码成这样的字符串:
pubsub_message = {'data': b64encode(message.encode()).decode()}