Dataflow 无法将消息从 PubSub 推送到 BigQuery
Dataflow failing to push messages to BigQuery from PubSub
我正在尝试使用数据管道。我正在使用 Python 客户端库将记录插入 PubSub。 DataFlow 应该从那里拾取它,然后推入 BQ。数据流 failing.My 猜测是因为我没有正确的数据编码。我的代码如下所示:
data = base64.b64encode(message) publisher.publish(topic_path, data=data)
其中消息是一个字符串。
这是我要推送的 json 对象:
{ "current_speed" : "19.77", "_east" : "-87.654561", "_last_updt" :
"2018-07-17 15:31:30.0", "_region_id" : "1", "_north" : "42.026444",
"_south" : "41.997946", "region" : "Rogers Park - West Ridge", "_west"
: "-87.709645", "_description" : "North of Devon. Kedzie to Lake
Shore" }
我已经尝试了几个变体,我可以在 pubsub 中看到 b64 或 json 中的数据。
当我看到json时,我是这样看的:
─────┬────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │
─────┼────────────┤
│ {u'_south': u'41.997946', u'_north': u'42.026444', u'_description': u'North of Devon. Kedzie to Lake Shore', u'_east': u'-87.654561', u'region': u'Rogers Park - West Ridge', u'_west': u'-87.709645', u'current_speed': u'21.82', u'_last_updt': u'2018-07-18 10:10:48.0', u'_region_id': u'1'} │ 154626108014988 │ │
└───────────────────────────────────────────────────────────────────────────
注意每个元素前面的附加 u。那是因为我在做 UTF-8 编码。这是把事情搞砸了吗?我在用
data = data.encode('utf-8')
这段代码按照此处所述执行 utf-8:https://cloud.google.com/pubsub/docs/publisher
我正在使用以下命令在 pubsub 中检查我的内容:
gcloud pubsub subscriptions pull --auto-ack debug_subscription
问。我应该在主题中看到什么? json 还是二进制?是否有任何 python 示例显示加密有效负载的正确方法,以便 pubsub 将其提取到 BQ 模板?
为什么使用data = base64.b64encode(message)
? message
到底是什么?
我尝试使用此代码段 Pub/Sub 到 BigQuery 提供的数据流模板,它有效:
def publish_messages(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
for n in range(1, 6):
data = u'{"column1": "value1","column2": "value2"}'
# Data must be a bytestring
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
print('Published messages.')
不使用 base64 编码试试这个。
我正在尝试使用数据管道。我正在使用 Python 客户端库将记录插入 PubSub。 DataFlow 应该从那里拾取它,然后推入 BQ。数据流 failing.My 猜测是因为我没有正确的数据编码。我的代码如下所示:
data = base64.b64encode(message) publisher.publish(topic_path, data=data)
其中消息是一个字符串。 这是我要推送的 json 对象:
{ "current_speed" : "19.77", "_east" : "-87.654561", "_last_updt" : "2018-07-17 15:31:30.0", "_region_id" : "1", "_north" : "42.026444", "_south" : "41.997946", "region" : "Rogers Park - West Ridge", "_west" : "-87.709645", "_description" : "North of Devon. Kedzie to Lake Shore" }
我已经尝试了几个变体,我可以在 pubsub 中看到 b64 或 json 中的数据。
当我看到json时,我是这样看的:
─────┬────────────┐ │ DATA │ MESSAGE_ID │ ATTRIBUTES │ ─────┼────────────┤ │ {u'_south': u'41.997946', u'_north': u'42.026444', u'_description': u'North of Devon. Kedzie to Lake Shore', u'_east': u'-87.654561', u'region': u'Rogers Park - West Ridge', u'_west': u'-87.709645', u'current_speed': u'21.82', u'_last_updt': u'2018-07-18 10:10:48.0', u'_region_id': u'1'} │ 154626108014988 │ │ └───────────────────────────────────────────────────────────────────────────
注意每个元素前面的附加 u。那是因为我在做 UTF-8 编码。这是把事情搞砸了吗?我在用
data = data.encode('utf-8')
这段代码按照此处所述执行 utf-8:https://cloud.google.com/pubsub/docs/publisher
我正在使用以下命令在 pubsub 中检查我的内容:
gcloud pubsub subscriptions pull --auto-ack debug_subscription
问。我应该在主题中看到什么? json 还是二进制?是否有任何 python 示例显示加密有效负载的正确方法,以便 pubsub 将其提取到 BQ 模板?
为什么使用data = base64.b64encode(message)
? message
到底是什么?
我尝试使用此代码段 Pub/Sub 到 BigQuery 提供的数据流模板,它有效:
def publish_messages(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)
for n in range(1, 6):
data = u'{"column1": "value1","column2": "value2"}'
# Data must be a bytestring
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
print('Published messages.')
不使用 base64 编码试试这个。