发布到 pubsub 主题时出现间歇性身份验证错误
Intermittent authentication error when posting to a pubsub topic
我们在 Google Cloud Dataflow 中内置了一个数据管道,它使用来自 pubsub 主题的消息并将它们流式传输到 BigQuery。为了测试它是否成功运行,我们在 CI 管道中进行了一些 运行 测试,这些测试将 post 消息发送到 pubsub 主题,并验证消息是否已成功写入 BigQuery。
这是 post 发布订阅主题的代码:
from google.cloud import pubsub_v1
def post_messages(project_id, topic_id, rows)
futures = dict()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
project_id, topic_id
)
def get_callback(f, data):
def callback(f):
try:
futures.pop(data)
except:
print("Please handle {} for {}.".format(f.exception(), data))
return callback
for row in rows:
# When you publish a message, the client returns a future. Data must be a bytestring
# ...
# construct a message in var json_data
# ...
message = json.dumps(json_data).encode("utf-8")
future = publisher.publish(
topic_path,
message
)
futures_key = str(message)
futures[futures_key] = future
future.add_done_callback(get_callback(future, futures_key))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(1)
当我们 运行 在我们的 CI 管道中进行此测试时,它开始间歇性地失败并出现错误
21:38:55: AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f5247407220>" raised exception!
Traceback (most recent call last):
File "/opt/conda/envs/py3/lib/python3.8/site-packages/grpc/_plugin_wrapping.py", line 89, in __call__
self._metadata_plugin(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 101, in __call__
callback(self._get_authorization_headers(context), None)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 87, in _get_authorization_headers
self._credentials.before_request(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 134, in before_request
self.apply(headers)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 110, in apply
_helpers.from_bytes(token or self.token)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/_helpers.py", line 130, in from_bytes
raise ValueError("***0!r*** could not be converted to unicode".format(value))
ValueError: None could not be converted to unicode
Error: The operation was canceled.
不幸的是,这仅在我们的 CI 管道中失败,即使如此它也会间歇性地失败(仅在所有 CI 管道 运行 中的一小部分失败)。如果我 运行 在本地进行相同的测试,它每次都会成功。当 运行 在 CI 管道中时,代码正在作为服务帐户进行身份验证,而当我 运行 它在本地进行身份验证时,它正在作为我自己进行身份验证
我从错误消息中得知此代码失败:
if isinstance(result, six.text_type):
return result
else:
raise ValueError("{0!r} could not be converted to unicode".format(value))
它位于我们使用 pip 安装的 google 的 python 库中。
清楚表达:
isinstance(result, six.text_type)
正在评估 False
。当我在本地 运行 它时,我在该代码上放置了一个断点,发现在正常情况下(即当它工作时) result
的值是这样的:
这看起来像是某种身份验证令牌。
给出错误信息:
ValueError: None could not be converted to unicode
似乎 google 身份验证库正在执行的任何操作都将 None
传递给上面显示的代码。
我的知识范围在这里。鉴于这仅在 CI 管道中失败,我没有机会在我的代码中放置断点并对其进行调试。鉴于错误消息中的调用堆栈,这与身份验证有关。
我希望有人能就行动方案提出建议。
任何人都可以解释一种方法,通过它我可以发现为什么 None
被传递给引发错误的代码吗?
我们有同样的错误。最终通过使用 JSON Web Token 为每个 Google 的 Quckstart 进行身份验证解决了这个问题。像这样:
import json
from google.cloud import pubsub_v1
from google.auth import jwt
def post_messages(credentials_path, topic, list_of_messages):
credentials_dict = json.load(open(credentials_path,'r'))
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_ob = jwt.Credentials.from_service_account_info(
credentials_dict, audience=audience
)
publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)
for message_dict in list_of_message_dicts:
message = json.dumps(message_dict, default=str).encode("utf-8")
future = publisher.publish(topic, message)
我们也更新了我们的环境,但它没有修复 ValueError
,直到我们更改为 jwt
。无论如何,这里是环境:
google-api-core==2.4.0
google-api-python-client==2.36.0
google-auth==2.3.2
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.1.0
google-cloud-pubsub==2.9.0
我们在 Google Cloud Dataflow 中内置了一个数据管道,它使用来自 pubsub 主题的消息并将它们流式传输到 BigQuery。为了测试它是否成功运行,我们在 CI 管道中进行了一些 运行 测试,这些测试将 post 消息发送到 pubsub 主题,并验证消息是否已成功写入 BigQuery。
这是 post 发布订阅主题的代码:
from google.cloud import pubsub_v1
def post_messages(project_id, topic_id, rows)
futures = dict()
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
project_id, topic_id
)
def get_callback(f, data):
def callback(f):
try:
futures.pop(data)
except:
print("Please handle {} for {}.".format(f.exception(), data))
return callback
for row in rows:
# When you publish a message, the client returns a future. Data must be a bytestring
# ...
# construct a message in var json_data
# ...
message = json.dumps(json_data).encode("utf-8")
future = publisher.publish(
topic_path,
message
)
futures_key = str(message)
futures[futures_key] = future
future.add_done_callback(get_callback(future, futures_key))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(1)
当我们 运行 在我们的 CI 管道中进行此测试时,它开始间歇性地失败并出现错误
21:38:55: AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f5247407220>" raised exception!
Traceback (most recent call last):
File "/opt/conda/envs/py3/lib/python3.8/site-packages/grpc/_plugin_wrapping.py", line 89, in __call__
self._metadata_plugin(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 101, in __call__
callback(self._get_authorization_headers(context), None)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 87, in _get_authorization_headers
self._credentials.before_request(
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 134, in before_request
self.apply(headers)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 110, in apply
_helpers.from_bytes(token or self.token)
File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/_helpers.py", line 130, in from_bytes
raise ValueError("***0!r*** could not be converted to unicode".format(value))
ValueError: None could not be converted to unicode
Error: The operation was canceled.
不幸的是,这仅在我们的 CI 管道中失败,即使如此它也会间歇性地失败(仅在所有 CI 管道 运行 中的一小部分失败)。如果我 运行 在本地进行相同的测试,它每次都会成功。当 运行 在 CI 管道中时,代码正在作为服务帐户进行身份验证,而当我 运行 它在本地进行身份验证时,它正在作为我自己进行身份验证
我从错误消息中得知此代码失败:
if isinstance(result, six.text_type):
return result
else:
raise ValueError("{0!r} could not be converted to unicode".format(value))
它位于我们使用 pip 安装的 google 的 python 库中。
清楚表达:
isinstance(result, six.text_type)
正在评估 False
。当我在本地 运行 它时,我在该代码上放置了一个断点,发现在正常情况下(即当它工作时) result
的值是这样的:
这看起来像是某种身份验证令牌。
给出错误信息:
ValueError: None could not be converted to unicode
似乎 google 身份验证库正在执行的任何操作都将 None
传递给上面显示的代码。
我的知识范围在这里。鉴于这仅在 CI 管道中失败,我没有机会在我的代码中放置断点并对其进行调试。鉴于错误消息中的调用堆栈,这与身份验证有关。
我希望有人能就行动方案提出建议。
任何人都可以解释一种方法,通过它我可以发现为什么 None
被传递给引发错误的代码吗?
我们有同样的错误。最终通过使用 JSON Web Token 为每个 Google 的 Quckstart 进行身份验证解决了这个问题。像这样:
import json
from google.cloud import pubsub_v1
from google.auth import jwt
def post_messages(credentials_path, topic, list_of_messages):
credentials_dict = json.load(open(credentials_path,'r'))
audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
credentials_ob = jwt.Credentials.from_service_account_info(
credentials_dict, audience=audience
)
publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)
for message_dict in list_of_message_dicts:
message = json.dumps(message_dict, default=str).encode("utf-8")
future = publisher.publish(topic, message)
我们也更新了我们的环境,但它没有修复 ValueError
,直到我们更改为 jwt
。无论如何,这里是环境:
google-api-core==2.4.0
google-api-python-client==2.36.0
google-auth==2.3.2
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.1.0
google-cloud-pubsub==2.9.0