导入 WriteToDatastore 时出错(Apache Beam/Google DataFlow)
error while importing WriteToDatastore (Apache Beam/Google DataFlow)
我正在尝试使用 Apache Beam 管道将实体写入 Google 云数据存储。为了进行测试,我在使用 Apache Beam instructions 设置的本地 Python 2.7 虚拟环境中执行此操作。编码是在本地的 Jupyter 笔记本中完成的。这是我正在尝试的伪代码:
import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity
# projectId will be taken from the environment
storage = datastore.Client()
# The kind for the new entity
gds_entity_kind = 'test_entity'
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
def create_entity(entity_id, name):
key = storage.key(gds_entity_kind, int(entity_id))
entity = Entity(key=key)
entity.update({
'name': name
})
return entity
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
# | 'write to datastore' >> WriteToDatastore()
| 'debug print' >> beam.ParDo(PrintFn())
)
我发现 有类似的问题,但答案似乎与我的情况无关?!我发现问题似乎与导入语句有关
from google.cloud import datastore
相对于
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
如果我重新启动内核并只导入 WriteToDataStore,那么我不会收到任何错误。如果我尝试同时导入两者,则会出现此错误。感谢您的帮助!
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
3 from google.cloud import datastore
4 from google.cloud.datastore.entity import Entity
5
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
21 import time
22
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
24 from apache_beam.io.gcp.datastore.v1 import query_splitter
25 from apache_beam.io.gcp.datastore.v1 import util
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
34 # pylint: disable=wrong-import-order, wrong-import-position
35 try:
---> 36 from google.cloud.proto.datastore.v1 import datastore_pb2
37 from google.cloud.proto.datastore.v1 import entity_pb2
38 from google.cloud.proto.datastore.v1 import query_pb2
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
15
16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
19
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
26 serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
27 ,
---> 28 dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
30
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
827 # TODO(amauryfa): use the pool passed as argument. This will work only
828 # for C++-implemented DescriptorPools.
--> 829 return _message.default_pool.AddSerializedFile(serialized_pb)
830 else:
831 return super(FileDescriptor, cls).__new__(cls)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
我发现将 Google Cloud Datastore 与 Apache Beam (Google Cloud Dataflow) 结合使用的编程概念不同于默认的 Datastore API。
您需要使用 this example 中给出的数据存储助手。有了这个,我能够更改现在成功运行的代码。注意实体的不同导入和不同的实体创建过程。
总而言之,它以完全限定的 JSON 表示法创建实体,您在 Google 云控制台中浏览数据存储时也会看到这种表示法。我的原始代码以更简单的 JSON 创建实体,在写入数据存储区时通常也可以理解该实体。总的来说,我避免了导致错误的原始两个不同导入的不同依赖项。
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter
#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity
def create_entity(_id, name):
entity = entity_pb2.Entity()
kind = "test_entity"
datastore_helper.add_key_path(entity.key, kind, _id)
datastore_helper.add_properties(entity, {
"name": unicode(name)
})
return entity
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
project_id = 'your-gcp-project-id'
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
| 'write to datastore' >> WriteToDatastore(project_id)
# | 'debug print' >> beam.ParDo(PrintFn())
)
我正在尝试使用 Apache Beam 管道将实体写入 Google 云数据存储。为了进行测试,我在使用 Apache Beam instructions 设置的本地 Python 2.7 虚拟环境中执行此操作。编码是在本地的 Jupyter 笔记本中完成的。这是我正在尝试的伪代码:
import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity
# projectId will be taken from the environment
storage = datastore.Client()
# The kind for the new entity
gds_entity_kind = 'test_entity'
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
def create_entity(entity_id, name):
key = storage.key(gds_entity_kind, int(entity_id))
entity = Entity(key=key)
entity.update({
'name': name
})
return entity
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
# | 'write to datastore' >> WriteToDatastore()
| 'debug print' >> beam.ParDo(PrintFn())
)
我发现
from google.cloud import datastore
相对于
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
如果我重新启动内核并只导入 WriteToDataStore,那么我不会收到任何错误。如果我尝试同时导入两者,则会出现此错误。感谢您的帮助!
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
3 from google.cloud import datastore
4 from google.cloud.datastore.entity import Entity
5
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
21 import time
22
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
24 from apache_beam.io.gcp.datastore.v1 import query_splitter
25 from apache_beam.io.gcp.datastore.v1 import util
/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
34 # pylint: disable=wrong-import-order, wrong-import-position
35 try:
---> 36 from google.cloud.proto.datastore.v1 import datastore_pb2
37 from google.cloud.proto.datastore.v1 import entity_pb2
38 from google.cloud.proto.datastore.v1 import query_pb2
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
15
16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
19
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
26 serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
27 ,
---> 28 dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
30
/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
827 # TODO(amauryfa): use the pool passed as argument. This will work only
828 # for C++-implemented DescriptorPools.
--> 829 return _message.default_pool.AddSerializedFile(serialized_pb)
830 else:
831 return super(FileDescriptor, cls).__new__(cls)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto". To use it here, please add the necessary import.
我发现将 Google Cloud Datastore 与 Apache Beam (Google Cloud Dataflow) 结合使用的编程概念不同于默认的 Datastore API。
您需要使用 this example 中给出的数据存储助手。有了这个,我能够更改现在成功运行的代码。注意实体的不同导入和不同的实体创建过程。
总而言之,它以完全限定的 JSON 表示法创建实体,您在 Google 云控制台中浏览数据存储时也会看到这种表示法。我的原始代码以更简单的 JSON 创建实体,在写入数据存储区时通常也可以理解该实体。总的来说,我避免了导致错误的原始两个不同导入的不同依赖项。
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter
#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity
def create_entity(_id, name):
entity = entity_pb2.Entity()
kind = "test_entity"
datastore_helper.add_key_path(entity.key, kind, _id)
datastore_helper.add_properties(entity, {
"name": unicode(name)
})
return entity
class PrintFn(beam.DoFn):
def process(self, element):
print (element)
return None
project_id = 'your-gcp-project-id'
lines = [
"'0815';'entity A'",
"'4711';'entity B'"
]
with beam.Pipeline() as p:
(p
| 'read lines' >> beam.Create(lines)
| 'rows to columns' >> beam.Map(lambda v: v.split(';'))
| 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
| 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
| 'write to datastore' >> WriteToDatastore(project_id)
# | 'debug print' >> beam.ParDo(PrintFn())
)