实现一个云函数来发布到由 GCS finalize 触发的 pubsub

Implementing a cloud function to publish to pubsub triggered by GCS finalize

我一直在尝试在 Python 中编写和部署云函数。 (放弃 node.js 由于杂乱的文档和相对较快的更改速度)

它旨在向 Pub/Sub 主题发布消息,当文件完成上传到 google 云存储桶 ("finalize") 时触发。

我用来部署函数的代码是

gcloud functions deploy hello_gcs_generic --runtime python37 --trigger-resource bucketcfpubsub

我一直在尝试使用 this script provided by Google

import time

from google.cloud import pubsub_v1

project_id = "bucketcfpubsub"
topic_name = "projects/bucketcfpubsub/topics/pubsub"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
# When timeout is unspecified, the exception method waits indefinitely.
if message_future.exception(timeout=30):
    print('Publishing message on {} threw an Exception {}.'.format(
        topic_name, message_future.exception()))
else:
    print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
# Data must be a bytestring
    data = data.encode('utf-8')
# When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

我在 Google Cloud Console

中收到这些错误
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3, in <module>
    from google.cloud import pubsub_v1
ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)

按照 帖子的说明,我从 helloworld 代码示例中复制了 requirements.txt,仅包含

google-cloud-error-reporting==0.30.0

并更新了其他云功能,例如 bigquery、存储和日志记录。然后我得到了这些错误:

ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3, in <module>
from google.cloud import pubsub_v1`

我也找到了[这个帖子](ImportError: cannot import name 'pubsub_v1' from 'google.cloud' (unknown location)但我真的不明白解决方案是什么,我试过用google-cloud-pubsub替换pubsub_v1 ==0.38.0 没有帮助。我得到这个错误:

Deploying function (may take a while - up to 2 minutes)...failed.
ERROR: (gcloud.functions.deploy) OperationError: code=3, message=Function load error: Code in file main.py can't be loaded.
Detailed stack trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 256, in check_or_load_user_function
    _function_handler.load_user_function()
  File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 166, in load_user_function
    spec.loader.exec_module(main)
  File "<frozen importlib._bootstrap_external>", line 724, in exec_module
  File "<frozen importlib._bootstrap_external>", line 860, in get_code
  File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/user_code/main.py", line 3

此外,如果代码一旦中断,这似乎不是一个可持续的解决方案Google 将 pubsub 更新到新版本?

所以我是一个非常初学者并且很迷茫,但我希望这个文档可以帮助你们帮助我。

更新:

似乎 pubsub 和 pubsub_v1 都可以使用,但不确定有什么区别。

@dustin 我做了一个 pip install -r requirements.txt 结果与你提供的相匹配。 我还注意到将函数部署为 'hello-gcs-generic' 时出错,应将其更改为 'callback'.

python 代码现在在本地运行良好,但是使用上面的代码(OP 中的第一行代码)将其部署到云中始终returns 这个错误

ERROR: (gcloud.functions.deploy) OperationError: code=3, messa
ge=Function load error: Error: function load attempt timed out
.

您需要将 google-cloud-pubsub 添加到 requirements.txt 文件,而不是 main.py 文件。它应该是这样的:

google-cloud-error-reporting==0.30.0
google-cloud-pubsub==0.38.0

有一个 simpler Python quickstart example 可以满足您的需要。 ;-)

您引用的示例更高级。它显示了如何使用 error-handling 发布消息。高级示例中的 while(True): sleep(60) 行是为了让主线程保持活动状态,除非发出 Ctrl+C 或其等价物以从 运行 停止程序。这个 sleep 函数存在的原因是我们可以等待发布期货的回调调用完成,而不是在发布调用后立即退出程序。同样,对于您尝试学习使用 Cloud Pub/Sub 和 Cloud Functions 做的事情来说,这可能有点太复杂了。我建议避开高级示例并使用快速入门示例。

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')