将两个列表合并到 PCollection

combine two lists to PCollection

我正在使用 Apache Beam。写入 tfRecord 时,我需要包含项目的 ID 及其文本和嵌入。 本教程仅使用一个文本列表,但我还有一个 ID 列表以匹配文本列表,所以我想知道如何将 ID 传递给以下函数:

  def to_tf_example(entries):
  examples = []

  text_list, embedding_list = entries
  for i in range(len(text_list)):
    text = text_list[i]
    embedding = embedding_list[i]

    features = {
        # need to pass in ID here like so:
        'id': tf.train.Feature(
            bytes_list=tf.train.BytesList(value=[ids.encode('utf-8')])),
        'text': tf.train.Feature(
            bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
        'embedding': tf.train.Feature(
            float_list=tf.train.FloatList(value=embedding.tolist()))
    }
  
    example = tf.train.Example(
        features=tf.train.Features(
            feature=features)).SerializeToString(deterministic=True)
  
    examples.append(example)
  
  return examples

我的第一个想法是将 ID 包含在我的数据库的文本列中,然后通过切片或正则表达式或其他方式提取它们,但想知道是否有更好的方法,我假设转换为 PCollection 但不不知道从哪里开始。这是管道:

    with beam.Pipeline(args.runner, options=options) as pipeline:
        query_data = pipeline | 'Read data from BigQuery' >> 
        beam.io.Read(beam.io.BigQuerySource(project='my-project', query=get_data(args.limit), use_standard_sql=True))
        # list of texts
        text = query_data | 'get list of text' >> beam.Map(lambda x: x['text'])
        # list of ids
        ids = query_data | 'get list of ids' >> beam.Map(lambda x: x['id'])
    
        ( text
            | 'Batch elements' >> util.BatchElements(
            min_batch_size=args.batch_size, max_batch_size=args.batch_size)
            | 'Generate embeddings' >> beam.Map(
            generate_embeddings, args.module_url, args.random_projection_matrix)
            | 'Encode to tf example' >> beam.FlatMap(to_tf_example)
            | 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
            file_path_prefix='{0}'.format(args.output_dir),
            file_name_suffix='.tfrecords')
        )

        query_data | 'Convert to entity and write to datastore' >> beam.Map(
                lambda input_features: create_entity(
                    input_features, args.kind))

这里我假设 generate_embeddings 有签名 List[str], ... -> (List[str], List[List[float]])

您要做的是避免将文本和 ID 拆分为单独的 PCollections。所以你可能想写这样的东西

def generate_embeddings_for_batch(
    batch,
    module_url,
    random_projection_matrix) -> Tuple[int, str, List[float]]:
  embeddings = generate_embeddings(
      [x['text'] for x in batch], module_url, random_projection_matrix)
  text_to_embedding = dict(embeddings)
  for id, text in batch:
    yield x['id'], x['text'], text_to_embedding[x['text']]

从那里你应该可以写 to_tf_example.

考虑使用 TFX 可能有意义。

我将 generate_embeddings 更改为 return List[int]、List[string]、List[List[float]],然后使用以下函数将 id 和文本列表传递到:

 def generate_embeddings_for_batch(batch, module_url, random_projection_matrix):
  embeddings = generate_embeddings([x['id'] for x in batch], [x['text'] for x in batch], module_url, random_projection_matrix)
  return embeddings