将两个列表合并到 PCollection
combine two lists to PCollection
我正在使用 Apache Beam。写入 tfRecord 时,我需要包含项目的 ID 及其文本和嵌入。
本教程仅使用一个文本列表,但我还有一个 ID 列表以匹配文本列表,所以我想知道如何将 ID 传递给以下函数:
def to_tf_example(entries):
examples = []
text_list, embedding_list = entries
for i in range(len(text_list)):
text = text_list[i]
embedding = embedding_list[i]
features = {
# need to pass in ID here like so:
'id': tf.train.Feature(
bytes_list=tf.train.BytesList(value=[ids.encode('utf-8')])),
'text': tf.train.Feature(
bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
'embedding': tf.train.Feature(
float_list=tf.train.FloatList(value=embedding.tolist()))
}
example = tf.train.Example(
features=tf.train.Features(
feature=features)).SerializeToString(deterministic=True)
examples.append(example)
return examples
我的第一个想法是将 ID 包含在我的数据库的文本列中,然后通过切片或正则表达式或其他方式提取它们,但想知道是否有更好的方法,我假设转换为 PCollection 但不不知道从哪里开始。这是管道:
with beam.Pipeline(args.runner, options=options) as pipeline:
query_data = pipeline | 'Read data from BigQuery' >>
beam.io.Read(beam.io.BigQuerySource(project='my-project', query=get_data(args.limit), use_standard_sql=True))
# list of texts
text = query_data | 'get list of text' >> beam.Map(lambda x: x['text'])
# list of ids
ids = query_data | 'get list of ids' >> beam.Map(lambda x: x['id'])
( text
| 'Batch elements' >> util.BatchElements(
min_batch_size=args.batch_size, max_batch_size=args.batch_size)
| 'Generate embeddings' >> beam.Map(
generate_embeddings, args.module_url, args.random_projection_matrix)
| 'Encode to tf example' >> beam.FlatMap(to_tf_example)
| 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
file_path_prefix='{0}'.format(args.output_dir),
file_name_suffix='.tfrecords')
)
query_data | 'Convert to entity and write to datastore' >> beam.Map(
lambda input_features: create_entity(
input_features, args.kind))
这里我假设 generate_embeddings
有签名 List[str], ... -> (List[str], List[List[float]])
您要做的是避免将文本和 ID 拆分为单独的 PCollections。所以你可能想写这样的东西
def generate_embeddings_for_batch(
batch,
module_url,
random_projection_matrix) -> Tuple[int, str, List[float]]:
embeddings = generate_embeddings(
[x['text'] for x in batch], module_url, random_projection_matrix)
text_to_embedding = dict(embeddings)
for id, text in batch:
yield x['id'], x['text'], text_to_embedding[x['text']]
从那里你应该可以写 to_tf_example.
考虑使用 TFX 可能有意义。
我将 generate_embeddings 更改为 return List[int]、List[string]、List[List[float]],然后使用以下函数将 id 和文本列表传递到:
def generate_embeddings_for_batch(batch, module_url, random_projection_matrix):
embeddings = generate_embeddings([x['id'] for x in batch], [x['text'] for x in batch], module_url, random_projection_matrix)
return embeddings
我正在使用 Apache Beam。写入 tfRecord 时,我需要包含项目的 ID 及其文本和嵌入。 本教程仅使用一个文本列表,但我还有一个 ID 列表以匹配文本列表,所以我想知道如何将 ID 传递给以下函数:
def to_tf_example(entries):
examples = []
text_list, embedding_list = entries
for i in range(len(text_list)):
text = text_list[i]
embedding = embedding_list[i]
features = {
# need to pass in ID here like so:
'id': tf.train.Feature(
bytes_list=tf.train.BytesList(value=[ids.encode('utf-8')])),
'text': tf.train.Feature(
bytes_list=tf.train.BytesList(value=[text.encode('utf-8')])),
'embedding': tf.train.Feature(
float_list=tf.train.FloatList(value=embedding.tolist()))
}
example = tf.train.Example(
features=tf.train.Features(
feature=features)).SerializeToString(deterministic=True)
examples.append(example)
return examples
我的第一个想法是将 ID 包含在我的数据库的文本列中,然后通过切片或正则表达式或其他方式提取它们,但想知道是否有更好的方法,我假设转换为 PCollection 但不不知道从哪里开始。这是管道:
with beam.Pipeline(args.runner, options=options) as pipeline:
query_data = pipeline | 'Read data from BigQuery' >>
beam.io.Read(beam.io.BigQuerySource(project='my-project', query=get_data(args.limit), use_standard_sql=True))
# list of texts
text = query_data | 'get list of text' >> beam.Map(lambda x: x['text'])
# list of ids
ids = query_data | 'get list of ids' >> beam.Map(lambda x: x['id'])
( text
| 'Batch elements' >> util.BatchElements(
min_batch_size=args.batch_size, max_batch_size=args.batch_size)
| 'Generate embeddings' >> beam.Map(
generate_embeddings, args.module_url, args.random_projection_matrix)
| 'Encode to tf example' >> beam.FlatMap(to_tf_example)
| 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
file_path_prefix='{0}'.format(args.output_dir),
file_name_suffix='.tfrecords')
)
query_data | 'Convert to entity and write to datastore' >> beam.Map(
lambda input_features: create_entity(
input_features, args.kind))
这里我假设 generate_embeddings
有签名 List[str], ... -> (List[str], List[List[float]])
您要做的是避免将文本和 ID 拆分为单独的 PCollections。所以你可能想写这样的东西
def generate_embeddings_for_batch(
batch,
module_url,
random_projection_matrix) -> Tuple[int, str, List[float]]:
embeddings = generate_embeddings(
[x['text'] for x in batch], module_url, random_projection_matrix)
text_to_embedding = dict(embeddings)
for id, text in batch:
yield x['id'], x['text'], text_to_embedding[x['text']]
从那里你应该可以写 to_tf_example.
考虑使用 TFX 可能有意义。
我将 generate_embeddings 更改为 return List[int]、List[string]、List[List[float]],然后使用以下函数将 id 和文本列表传递到:
def generate_embeddings_for_batch(batch, module_url, random_projection_matrix):
embeddings = generate_embeddings([x['id'] for x in batch], [x['text'] for x in batch], module_url, random_projection_matrix)
return embeddings