Google 数据流传递数据存储键作为输入参数

Google Data Flow Passing data store Key as input parameter

我正在尝试创建一个 google 数据流模板来读取 JSON 文件并将其加载到 google 数据存储。下面是我的代码。

我可以成功加载数据,但我想将数据存储 key/KIND 作为我模板的输入参数传递,并使用相同的方法创建实体。有人可以帮我传递代码吗?

下面是在 运行 时获取输入的代码片段。我有 --datastore_key 作为其中之一。

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--json_input',
                dest='json_input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

        parser.add_value_provider_argument(
                '--project_id',
                dest='project_id',
                type=str,
                required=False,
                help='Input Project ID.')

        parser.add_value_provider_argument(
                '--datastore_key',
                dest='datastore_key',
                type=str,
                required=False,
                help='The Key name')

下面是我根据 instruction here.

将 datastore_key 分配给实体创建的片段
class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id):
       self.project_id = project_id

    def start_bundle(self):
        self.client = datastore.Client()

    def start_datastore(self, datastore_key):
        self.datastore_key = datastore_key

    def process(self, an_int):
        yield self.datastore_key.get() + an_int

    def process(self, element):
        try:
            key = self.client.key(datastore_key ,element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

我正在创建如下管道,

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))

如果我将其作为 运行 时间参数 传递,我将无法创建 数据存储密钥。如果我像这样硬编码它的工作

key = self.client.key('customer' ,element['customerNumber'])

我想要这样的东西

key = self.client.key(runtime_datastore_key ,runtime_datastore_id)

有人可以帮助我如何将数据存储 Key/Kind 作为 运行 时间参数传递吗?

谢谢, GS

您似乎没有将 datastore_key 值提供程序传递给 CreateHbaseRow


尝试使用:

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id, datastore_key):
       self.project_id = project_id
       self.datastore_key = datastore_key

    def start_bundle(self):
        self.client = datastore.Client()

    def process(self, element):
        try:
            key = self.client.key(datastore_key.get(), element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

请注意,我离开了 project_id,因为您似乎想要它,但我下面的代码没有使用它


您还希望确保将相关值提供程序从 options 实例传递到您的 DoFn。因此,您的管道创建代码变为:

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))