如何从 python 中的 dataflow/beam 查询数据存储

How to query datastore from dataflow/beam in python

看起来 google 已经发布了对从 python 中的 dataflow/beam 查询数据存储的支持。我正在尝试将其发送到 运行 本地,但我 运行 遇到了一些问题:

import apache_beam as beam
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from gcloud import datastore

client = datastore.Client('my-project')
query = client.query(kind='Document')

options = get_options()
p = beam.Pipeline(options=options)

entities = p | 'read' >> ReadFromDatastore(project='my-project', query=query)
entities | 'write' >> beam.io.Write(beam.io.TextFileSink('gs://output.txt'))

p.run()

这给了我一个

AttributeError: 'Query' object has no attribute 'HasField' [while running 'read/Split Query']

我猜我传入了错误的查询对象(有 3-4 个 pip 包可以从中导入数据存储)但我不知道应该传入哪个. 在测试中他们通过了protobuf。那是我必须使用的吗?如果这是我必须做的,任何人都可以使用 protobuf 显示一个简单的示例查询吗?

wordcount example 使用 protobufs 进行查询。

看起来你需要这样的东西:

from google.datastore.v1 import query_pb2
...
query = query_pb2.Query()
query.kind.add().name = 'Document'