Google 数据流传递数据存储键作为输入参数
Google Data Flow Passing data store Key as input parameter
我正在尝试创建一个 google 数据流模板来读取 JSON 文件并将其加载到 google 数据存储。下面是我的代码。
我可以成功加载数据,但我想将数据存储 key/KIND 作为我模板的输入参数传递,并使用相同的方法创建实体。有人可以帮我传递代码吗?
下面是在 运行 时获取输入的代码片段。我有 --datastore_key 作为其中之一。
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
下面是我根据 instruction here.
将 datastore_key 分配给实体创建的片段
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
我正在创建如下管道,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
如果我将其作为 运行 时间参数 传递,我将无法创建 数据存储密钥。如果我像这样硬编码它的工作
key = self.client.key('customer' ,element['customerNumber'])
我想要这样的东西
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
有人可以帮助我如何将数据存储 Key/Kind 作为 运行 时间参数传递吗?
谢谢,
GS
您似乎没有将 datastore_key
值提供程序传递给 CreateHbaseRow
。
尝试使用:
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id, datastore_key):
self.project_id = project_id
self.datastore_key = datastore_key
def start_bundle(self):
self.client = datastore.Client()
def process(self, element):
try:
key = self.client.key(datastore_key.get(), element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
请注意,我离开了 project_id,因为您似乎想要它,但我下面的代码没有使用它。
您还希望确保将相关值提供程序从 options
实例传递到您的 DoFn
。因此,您的管道创建代码变为:
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))
我正在尝试创建一个 google 数据流模板来读取 JSON 文件并将其加载到 google 数据存储。下面是我的代码。
我可以成功加载数据,但我想将数据存储 key/KIND 作为我模板的输入参数传递,并使用相同的方法创建实体。有人可以帮我传递代码吗?
下面是在 运行 时获取输入的代码片段。我有 --datastore_key 作为其中之一。
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
下面是我根据 instruction here.
将 datastore_key 分配给实体创建的片段class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
我正在创建如下管道,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
如果我将其作为 运行 时间参数 传递,我将无法创建 数据存储密钥。如果我像这样硬编码它的工作
key = self.client.key('customer' ,element['customerNumber'])
我想要这样的东西
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
有人可以帮助我如何将数据存储 Key/Kind 作为 运行 时间参数传递吗?
谢谢, GS
您似乎没有将 datastore_key
值提供程序传递给 CreateHbaseRow
。
尝试使用:
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id, datastore_key):
self.project_id = project_id
self.datastore_key = datastore_key
def start_bundle(self):
self.client = datastore.Client()
def process(self, element):
try:
key = self.client.key(datastore_key.get(), element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
请注意,我离开了 project_id,因为您似乎想要它,但我下面的代码没有使用它。
您还希望确保将相关值提供程序从 options
实例传递到您的 DoFn
。因此,您的管道创建代码变为:
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))