如何编写 Cloud Function 来接收、解析和发布 PubSub 消息?
How do I write a Cloud Function to receive, parse, and publish PubSub messages?
-
google-cloud-platform
-
google-cloud-storage
-
google-cloud-functions
-
google-cloud-pubsub
-
google-cloud-python
这可以被视为 this thread 的后续行动,但我需要更多帮助来推进工作。希望有人可以看看我下面的尝试并提供进一步的指导。
总而言之,我需要一个
的云函数
- 由主题 A 中发布的 PubSub 消息触发(这可以在 UI 中完成)。
- 在 "push" PubSub 主题 A 中读取杂乱的对象更改通知消息。
- "parse"它
- 在 PubSub 主题 B 中发布消息,以原始消息 ID 为数据,其他元数据(例如文件名、大小、时间)为属性。
。 1:
杂乱对象更改通知示例:
\n "kind": "storage#object",\n "id": "bucketcfpubsub/test.txt/1544681756538155",\n "selfLink": "https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt",\n "name": "test.txt",\n "bucket": "bucketcfpubsub",\n "generation": "1544681756538155",\n "metageneration": "1",\n "contentType": "text/plain",\n "timeCreated": "2018-12-13T06:15:56.537Z",\n "updated": "2018-12-13T06:15:56.537Z",\n "storageClass": "STANDARD",\n "timeStorageClassUpdated": "2018-12-13T06:15:56.537Z",\n "size": "1938",\n "md5Hash": "sDSXIvkR/PBg4mHyIUIvww==",\n "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media",\n "crc32c": "UDhyzw==",\n "etag": "CKvqjvuTnN8CEAE="\n}\n
澄清一下,这是一条带有空白 "data" 字段的消息,并且上面的所有信息都是属性对(如 "attribute name": "attribute data")?或者它只是一个长字符串塞进 "data" 字段,没有 "attributes"?
。 2:
在上面的线程中,使用了 "pull" 订阅。它比使用 "push" 订阅更好吗?推送示例如下:
def create_push_subscription(project_id,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
push_config = pubsub_v1.types.PushConfig(
push_endpoint=endpoint)
subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config)
print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]
或者在此之后我还需要更多代码才能接收消息吗?
此外,每次发布 pubsub 消息触发 Cloud Function 时,这不会创建一个新订阅者吗?我应该在 CF 的末尾添加一个订阅删除代码,还是有更有效的方法来做到这一点?
。 3:
接下来解析代码,本示例代码做了如下几个属性:
def summarize(message):
# [START parse_message]
data = message.data
attributes = message.attributes
event_type = attributes['eventType']
bucket_id = attributes['bucketId']
object_id = attributes['objectId']
这是否适用于我在 1: 中的上述通知?
。 4:
如何分隔 topic_name?步骤1和2使用主题A,而这一步是发布到主题B。是不是就像在下面的代码示例中重写 topic_name 一样简单?
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
print('Published messages with custom attributes.')
我从中获得大部分示例代码的来源(除了上述线程):python-docs-samples。将上述代码示例改编和串联在一起会产生有用的代码吗?或者我还会缺少像 "import ****" 这样的东西吗?
您不应尝试在 Cloud Functions 中手动创建订阅者 运行。相反,请按照文档 here 设置云函数,通过传递 --trigger-topic
命令行参数,将使用发送到给定主题的所有消息调用该函数。
解决您的一些其他问题:
“我是否应该在CF的末尾添加一个订阅删除代码”-订阅是长期存在的资源,对应于特定的消息积压。如果在云函数末尾创建和删除订阅,订阅不存在时发送的消息将不会收到。
“我如何分隔 topic_name”- 此示例中的“topic_name”指的是将出现在 projects/project_id/topics/topic_name
格式的字符串的最后一部分=13=] 创建主题后在云控制台中。
google-cloud-platform
google-cloud-storage
google-cloud-functions
google-cloud-pubsub
google-cloud-python
这可以被视为 this thread 的后续行动,但我需要更多帮助来推进工作。希望有人可以看看我下面的尝试并提供进一步的指导。
总而言之,我需要一个
的云函数- 由主题 A 中发布的 PubSub 消息触发(这可以在 UI 中完成)。
- 在 "push" PubSub 主题 A 中读取杂乱的对象更改通知消息。
- "parse"它
- 在 PubSub 主题 B 中发布消息,以原始消息 ID 为数据,其他元数据(例如文件名、大小、时间)为属性。
。 1:
杂乱对象更改通知示例:
\n "kind": "storage#object",\n "id": "bucketcfpubsub/test.txt/1544681756538155",\n "selfLink": "https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt",\n "name": "test.txt",\n "bucket": "bucketcfpubsub",\n "generation": "1544681756538155",\n "metageneration": "1",\n "contentType": "text/plain",\n "timeCreated": "2018-12-13T06:15:56.537Z",\n "updated": "2018-12-13T06:15:56.537Z",\n "storageClass": "STANDARD",\n "timeStorageClassUpdated": "2018-12-13T06:15:56.537Z",\n "size": "1938",\n "md5Hash": "sDSXIvkR/PBg4mHyIUIvww==",\n "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media",\n "crc32c": "UDhyzw==",\n "etag": "CKvqjvuTnN8CEAE="\n}\n
澄清一下,这是一条带有空白 "data" 字段的消息,并且上面的所有信息都是属性对(如 "attribute name": "attribute data")?或者它只是一个长字符串塞进 "data" 字段,没有 "attributes"?
。 2:
在上面的线程中,使用了 "pull" 订阅。它比使用 "push" 订阅更好吗?推送示例如下:
def create_push_subscription(project_id,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]
from google.cloud import pubsub_v1
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
push_config = pubsub_v1.types.PushConfig(
push_endpoint=endpoint)
subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config)
print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]
或者在此之后我还需要更多代码才能接收消息吗?
此外,每次发布 pubsub 消息触发 Cloud Function 时,这不会创建一个新订阅者吗?我应该在 CF 的末尾添加一个订阅删除代码,还是有更有效的方法来做到这一点?
。 3:
接下来解析代码,本示例代码做了如下几个属性:
def summarize(message):
# [START parse_message]
data = message.data
attributes = message.attributes
event_type = attributes['eventType']
bucket_id = attributes['bucketId']
object_id = attributes['objectId']
这是否适用于我在 1: 中的上述通知?
。 4:
如何分隔 topic_name?步骤1和2使用主题A,而这一步是发布到主题B。是不是就像在下面的代码示例中重写 topic_name 一样简单?
# TODO topic_name = "Your Pub/Sub topic name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
print('Published messages with custom attributes.')
我从中获得大部分示例代码的来源(除了上述线程):python-docs-samples。将上述代码示例改编和串联在一起会产生有用的代码吗?或者我还会缺少像 "import ****" 这样的东西吗?
您不应尝试在 Cloud Functions 中手动创建订阅者 运行。相反,请按照文档 here 设置云函数,通过传递 --trigger-topic
命令行参数,将使用发送到给定主题的所有消息调用该函数。
解决您的一些其他问题:
“我是否应该在CF的末尾添加一个订阅删除代码”-订阅是长期存在的资源,对应于特定的消息积压。如果在云函数末尾创建和删除订阅,订阅不存在时发送的消息将不会收到。
“我如何分隔 topic_name”- 此示例中的“topic_name”指的是将出现在 projects/project_id/topics/topic_name
格式的字符串的最后一部分=13=] 创建主题后在云控制台中。