Apache Beam 管道中的组元素
Group elements in Apache Beam pipeline
我有一个从 AVRO 文件解析记录的管道。
我需要将传入的记录分成 500 个项目的块,以便调用同时接受多个输入的 API。
有没有办法用 Python SDK 做到这一点?
我假设您指的是批处理用例。您有几个选择:
如果您的 PCollection 足够大,并且您对 bundle 的大小有一定的灵活性,则可以在按 random/round robin 顺序为元素分配键后使用 GroupByKey
转换。例如:
my_collection = p | ReadRecordsFromAvro()
element_bundles = (my_collection
# Choose a number of keys that works for you (I chose 50 here)
| 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
| 'MakeBundles' >> beam.GroupByKey()
| 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
| beam.ParDo(ProcessBundlesDoFn()))
其中 ProcessBundlesDoFn
是这样的:
class ProcessBundlesDoFn(beam.DoFn):
def process(self, bundle):
while bundle.has_next():
# Fetch in batches of 500 until you're done
result = fetch_n_elements(bundle, 500)
yield result
如果您需要所有恰好 500 个元素的包,那么您可能需要:
- 计算您的 PCollection 中元素的数量
- 将计数作为单例端输入传递给您的
'AddKeys'
ParDo,以准确确定您需要的键数。
希望对您有所帮助。
我有一个从 AVRO 文件解析记录的管道。
我需要将传入的记录分成 500 个项目的块,以便调用同时接受多个输入的 API。
有没有办法用 Python SDK 做到这一点?
我假设您指的是批处理用例。您有几个选择:
如果您的 PCollection 足够大,并且您对 bundle 的大小有一定的灵活性,则可以在按 random/round robin 顺序为元素分配键后使用 GroupByKey
转换。例如:
my_collection = p | ReadRecordsFromAvro()
element_bundles = (my_collection
# Choose a number of keys that works for you (I chose 50 here)
| 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
| 'MakeBundles' >> beam.GroupByKey()
| 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
| beam.ParDo(ProcessBundlesDoFn()))
其中 ProcessBundlesDoFn
是这样的:
class ProcessBundlesDoFn(beam.DoFn):
def process(self, bundle):
while bundle.has_next():
# Fetch in batches of 500 until you're done
result = fetch_n_elements(bundle, 500)
yield result
如果您需要所有恰好 500 个元素的包,那么您可能需要:
- 计算您的 PCollection 中元素的数量
- 将计数作为单例端输入传递给您的
'AddKeys'
ParDo,以准确确定您需要的键数。
希望对您有所帮助。