Apache Beam Python SDK with Pub/Sub source stuck at runtime
Apache Beam Python SDK with Pub/Sub source stuck at runtime
我正在使用 Python SDK 在 Apache Beam 中编写程序以从 Pub/Sub 文件中读取 JSON 文件的内容,并对接收到的字符串进行一些处理。这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
虽然 运行 程序,代码在创建 PCollection tupled
后卡住了(之后没有代码行在执行)。奇怪的是,当我将源代码从 Pub/Sub 更改为包含 完全相同的 内容(使用 ReadFromText()
)的本地文件时,程序运行良好.
这种行为的原因可能是什么?
根据 Pub/Sub I/O 文档(Apache Beam docs and Dataflow Pub/Sub I/O docs),默认情况下,PubsubIO 转换与无界 PCollections 一起工作。
PCollections 可以是 bounded or unbounded:
- 有界:数据来自固定源,如文件。
- 无界:数据来自不断更新的来源,例如Pub/Sub订阅。
在对无界 PCollection 进行操作之前,您必须使用以下策略之一:
- Windowing: unbounded PCollections 不能直接用于分组转换(比如你正在使用的
CombinePerKey
),所以你应该先set a non-global windowing function .
- 触发器: 您可以 set up a trigger 无界 PCollection,即使订阅中的数据是仍在流动。
这可以解释您所看到的行为,即同一管道在从本地文件(有界数据源)读取时工作,但在读取时不工作来自 Pub/Sub 订阅(这是一个 无界 数据源)。
因此,为了使用 Pub/Sub 订阅,您应该应用窗口或触发策略,以便 PCollections 中的数据可以在以下转换中得到正确处理。
编辑: 此外,正如@Arjun 发现的那样,可能需要通过以下选项在管道中启用流式处理使用以下命令设置适当的 arg 参数:
pipeline_options.view_as(StandardOptions).streaming = True
我正在使用 Python SDK 在 Apache Beam 中编写程序以从 Pub/Sub 文件中读取 JSON 文件的内容,并对接收到的字符串进行一些处理。这是程序中我从 Pub/Sub 中提取内容并进行处理的部分:
with beam.Pipeline(options=PipelineOptions()) as pipeline:
lines = pipeline | beam.io.gcp.pubsub.ReadStringsFromPubSub(subscription=known_args.subscription)
lines_decoded = lines | beam.Map(lambda x: x.decode("base64"))
lines_split = lines_decoded | (beam.FlatMap(lambda x: x.split('\n')))
def json_to_tuple(jsonStr):
res = json.loads(jsonStr)
##printing retutn value
print (res['id'], res['messageSize'])
##
return (res['id'], res['messageSize'])
tupled = lines_split | beam.Map(json_to_tuple)
def printlines(line):
print line
result = tupled | beam.CombinePerKey(sum)
result | beam.Map(printlines)
虽然 运行 程序,代码在创建 PCollection tupled
后卡住了(之后没有代码行在执行)。奇怪的是,当我将源代码从 Pub/Sub 更改为包含 完全相同的 内容(使用 ReadFromText()
)的本地文件时,程序运行良好.
这种行为的原因可能是什么?
根据 Pub/Sub I/O 文档(Apache Beam docs and Dataflow Pub/Sub I/O docs),默认情况下,PubsubIO 转换与无界 PCollections 一起工作。
PCollections 可以是 bounded or unbounded:
- 有界:数据来自固定源,如文件。
- 无界:数据来自不断更新的来源,例如Pub/Sub订阅。
在对无界 PCollection 进行操作之前,您必须使用以下策略之一:
- Windowing: unbounded PCollections 不能直接用于分组转换(比如你正在使用的
CombinePerKey
),所以你应该先set a non-global windowing function . - 触发器: 您可以 set up a trigger 无界 PCollection,即使订阅中的数据是仍在流动。
这可以解释您所看到的行为,即同一管道在从本地文件(有界数据源)读取时工作,但在读取时不工作来自 Pub/Sub 订阅(这是一个 无界 数据源)。
因此,为了使用 Pub/Sub 订阅,您应该应用窗口或触发策略,以便 PCollections 中的数据可以在以下转换中得到正确处理。
编辑: 此外,正如@Arjun 发现的那样,可能需要通过以下选项在管道中启用流式处理使用以下命令设置适当的 arg 参数:
pipeline_options.view_as(StandardOptions).streaming = True