发布到 pubsub 主题时出现间歇性身份验证错误

Intermittent authentication error when posting to a pubsub topic

我们在 Google Cloud Dataflow 中内置了一个数据管道,它使用来自 pubsub 主题的消息并将它们流式传输到 BigQuery。为了测试它是否成功运行,我们在 CI 管道中进行了一些 运行 测试,这些测试将 post 消息发送到 pubsub 主题,并验证消息是否已成功写入 BigQuery。

这是 post 发布订阅主题的代码:

from google.cloud import pubsub_v1
def post_messages(project_id, topic_id, rows)
    futures = dict()
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(
        project_id, topic_id
    )

    def get_callback(f, data):
        def callback(f):
            try:
                futures.pop(data)
            except:
                print("Please handle {} for {}.".format(f.exception(), data))

        return callback

    for row in rows:
        # When you publish a message, the client returns a future. Data must be a bytestring
        # ...
        # construct a message in var json_data
        # ...
        message = json.dumps(json_data).encode("utf-8")
        future = publisher.publish(
            topic_path,
            message
        )
        futures_key = str(message)
        futures[futures_key] = future
        future.add_done_callback(get_callback(future, futures_key))
    # Wait for all the publish futures to resolve before exiting.
    while futures:
        time.sleep(1)

当我们 运行 在我们的 CI 管道中进行此测试时,它开始间歇性地失败并出现错误

21:38:55: AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f5247407220>" raised exception!
Traceback (most recent call last):
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/grpc/_plugin_wrapping.py", line 89, in __call__
    self._metadata_plugin(
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 101, in __call__
    callback(self._get_authorization_headers(context), None)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/transport/grpc.py", line 87, in _get_authorization_headers
    self._credentials.before_request(
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 134, in before_request
    self.apply(headers)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/credentials.py", line 110, in apply
    _helpers.from_bytes(token or self.token)
  File "/opt/conda/envs/py3/lib/python3.8/site-packages/google/auth/_helpers.py", line 130, in from_bytes
    raise ValueError("***0!r*** could not be converted to unicode".format(value))
ValueError: None could not be converted to unicode
Error: The operation was canceled.

不幸的是,这仅在我们的 CI 管道中失败,即使如此它也会间歇性地失败(仅在所有 CI 管道 运行 中的一小部分失败)。如果我 运行 在本地进行相同的测试,它每次都会成功。当 运行 在 CI 管道中时,代码正在作为服务帐户进行身份验证,而当我 运行 它在本地进行身份验证时,它正在作为我自己进行身份验证

我从错误消息中得知此代码失败:

if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError("{0!r} could not be converted to unicode".format(value))

https://github.com/googleapis/google-auth-library-python/blob/3c3fbf40b07e090f2be7fac5b304dbf438b5cd6c/google/auth/_helpers.py#L127-L130

它位于我们使用 pip 安装的 google 的 python 库中。

清楚表达:

isinstance(result, six.text_type)

正在评估 False。当我在本地 运行 它时,我在该代码上放置了一个断点,发现在正常情况下(即当它工作时) result 的值是这样的:

这看起来像是某种身份验证令牌。

给出错误信息:

ValueError: None could not be converted to unicode

似乎 google 身份验证库正在执行的任何操作都将 None 传递给上面显示的代码。

我的知识范围在这里。鉴于这仅在 CI 管道中失败,我没有机会在我的代码中放置断点并对其进行调试。鉴于错误消息中的调用堆栈,这与身份验证有关。

我希望有人能就行动方案提出建议。

任何人都可以解释一种方法,通过它我可以发现为什么 None 被传递给引发错误的代码吗?

我们有同样的错误。最终通过使用 JSON Web Token 为每个 Google 的 Quckstart 进行身份验证解决了这个问题。像这样:

import json
from google.cloud import pubsub_v1
from google.auth import jwt

def post_messages(credentials_path, topic, list_of_messages):

    credentials_dict = json.load(open(credentials_path,'r'))

    audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
    credentials_ob = jwt.Credentials.from_service_account_info(
        credentials_dict, audience=audience
    )

    publisher = pubsub_v1.PublisherClient(credentials=credentials_ob)

    for message_dict in list_of_message_dicts:
    
        message = json.dumps(message_dict, default=str).encode("utf-8")
    
        future = publisher.publish(topic, message)

我们也更新了我们的环境,但它没有修复 ValueError,直到我们更改为 jwt。无论如何,这里是环境:

google-api-core==2.4.0
google-api-python-client==2.36.0
google-auth==2.3.2
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
google-cloud-core==2.1.0
google-cloud-pubsub==2.9.0