Apache Beam Google 数据存储 ReadFromDatastore 实体 protobuf

Apache Beam Google Datastore ReadFromDatastore entity protobuf

我正在尝试使用 apache beam 的 google 数据存储 api 到 ReadFromDatastore

p = beam.Pipeline(options=options)
(p
 | 'Read from Datastore' >> ReadFromDatastore(gcloud_options.project, query)
 | 'reformat'            >> beam.Map(reformat)
 | 'Write To Datastore'  >> WriteToDatastore(gcloud_options.project))

传递给我的重新格式化函数的对象类型为

google.cloud.proto.datastore.v1.entity_pb2.Entity

它是protobuf格式,很难修改或读取。

我想我可以用

将 entity_pb2.Entity 转换为字典
entity= dict(google.cloud.datastore.helpers._property_tuples(entity_pb))

但由于某种原因,尝试导入以下两个库时出现一些错误:

import google.cloud.datastore.helpers  
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore 

错误:

Traceback (most recent call last):
  File "/home/nburn42/MotoGarage/MotoGarage/MotoGarageBackgroundJobs/format_data.py", line 16, in <module>
    import google.cloud.datastore.helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/__init__.py", line 57, in <module>
    from google.cloud.datastore.batch import Batch
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 24, in <module>
    from google.cloud.datastore import helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/helpers.py", line 29, in <module>
    from google.cloud.grpc.datastore.v1 import entity_pb2 as _entity_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/grpc/datastore/v1/entity_pb2.py", line 28, in <module>
    dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/descriptor.py", line 824, in __new__
    return _message.default_pool.AddSerializedFile(serialized_pb)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/grpc/datastore/v1/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/grpc/datastore/v1/entity.proto".  To use it here, please add the necessary import.

我可以做些什么来将 entity_pb2.Entity 转换成可用的东西吗?
ReadFromDatastore 对现在的实际使用来说是否太新了?
我应该使用另一种方法吗?

谢谢,
内森

您可以使用函数 google.cloud.datastore.helpers.entity_from_protobufentity_pb2.Entity 转换为 google.cloud.datastore.entity.Entity

google.cloud.datastore.entity.Entity 是 dict 的子类,将为您提供所需的可用性。

另一种(更简单的)指定查询的方法如下:

from google.cloud import datastore
from google.cloud.datastore import query as datastore_query
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore

p = beam.Pipeline(options=pipeline_options)
ds_client = datastore.Client(project=project)
query = ds_client.query(kind=kind)
# possible filter: query.add_filter('column','operator',criteria) 
# query.add_filter('age','>',18)
# query.add_filter('name','=',"John")
query = datastore_query._pb_from_query(query)

p | 'ReadFromDatastore' >> ReadFromDatastore(project=project, query=query)
p.run().wait_until_finish()

将作业传输到 DataflowRunner(在云端)时,请确保您的本地要求符合您传输到 google 云端的 setup.py 文件。我曾遇到过,您必须在本地计算机上安装 apache beam 2.1.0,然后在 setup.py 文件中指定相同的版本才能在云工作者上运行。

我遇到了同样的问题, 对我不起作用。

OP 有 3 个问题:

1.我可以做些什么来将 entity_pb2.Entity 转换为可用的东西吗?

您没有具体说明在使用返回值时遇到的困难,但是 entity_pb2.Entity 的所有实例都应该有 properties 属性。然后您应该能够使用它从您的实体中获取值。例如property_value = entity.properties.get('<your_property_name>')


更新: 我想我现在可能知道 OP 所说的“可用”是什么意思了,即使你 property_value = entity.properties.get('<your_property_name>') 你在 [=14] 中获得的价值=] 是协议缓冲区格式...所以要获取属性字典,您可以这样做...

from googledatastore import helper

value_dict = dict((prop_name, helper.get_value(entity.properties.get(prop_name)),) for prop_name in entity.properties)

2。 ReadFromDatastore 是否对现在的实际使用来说太新了?

我最初也有同样的想法,但我现在似乎已经开始工作了(请参阅下面我对 Q3 的回答)。

3。我应该使用另一种方法吗?

您绝对不能将 google-cloud-datastore 库导入到您的项目中。这样做会导致在从 apache_beam.

导入 ReadFromDatastore 时引发原始问题中的 TypeError: Couldn't build proto file into descriptor pool! 错误

从 investigation/debugging 我一直在做的看来 apache-beam (v2.8.0) 库的当前版本与 google-cloud-datastore (v1.7.1) 库不兼容。这意味着我们必须改用 bundled googledatastore (v7.0.1) 库来实现我们想要的。

进一步阅读/参考:

https://cloud.google.com/blog/products/gcp/how-to-do-data-processing-and-analytics-from-google-app-engine-with-google-cloud-dataflow

https://github.com/amygdala/gae-dataflow

https://gcloud-python.readthedocs.io/en/0.10.0/_modules/gcloud/datastore/helpers.html

最新版本的 Apache Beam 2.13 弃用了这种使用旧 googledatastore 库的旧方法,并添加了一个使用更新和更多 human-friendly google-cloud-datastore 库的新实现。

https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.io.gcp.datastore.v1new.datastoreio.html

https://github.com/apache/beam/pull/8262

添加示例仍有未解决的问题,所以现在您必须解决这部分问题。

https://issues.apache.org/jira/browse/BEAM-7350