Apache Beam Cloud Dataflow Streaming Stuck Side 输入

Apache Beam Cloud Dataflow Streaming Stuck Side Input

我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道。在这种情况下,我想使用来自 PubSub 的主要输入和来自 BigQuery 的辅助输入创建流式传输管道,并将处理后的数据存储回 BigQuery。

边管道代码

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)

侧输入码功能

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload

主管道代码

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" >> beam.Map(transform_function)
    | "timestamping" >> beam.Map(lambda src: window.TimestampedValue(
        src,
        dt.datetime.fromisoformat(src["timestamp"]).timestamp()
    ))
    | "windowing" >> beam.WindowInto(window.FixedWindows(30))
)

final_pipeline = (
    main_pipeline
    | "enrich data" >> beam.Map(enrich_payload, equipments=beam.pvalue.AsIter(side_pipeline))
    | "store" >> beam.io.WriteToBigQuery(bq_table)
)

result = p.run()
result.wait_until_finish()

将其部署到 Dataflow 后,一切正常,没有错误。但后来我注意到 enrich data 步骤有两个节点而不是一个。

而且,如您所见,侧输入卡住了,它在输入集合中有 Elements Added 21 个计数,在输出集合中 Elements Added 中有 - 值。

您可以找到完整的管道代码here and mock pubsub publisher here

我已经遵循了这些文档中的所有说明:

还是发现了这个错误。请帮我。谢谢!

这里有一个工作示例:

mytopic = ""
sql = "SELECT station_id, CURRENT_TIMESTAMP() timestamp FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations` LIMIT 10"

def to_bqrequest(e, sql):
    from apache_beam.io import ReadFromBigQueryRequest
    yield ReadFromBigQueryRequest(query=sql)
     

def merge(e, side):
    for i in side:
        yield f"Main {e.decode('utf-8')} Side {i}"

pubsub = p | "Read PubSub topic" >> ReadFromPubSub(topic=mytopic)

side_pcol = (p | PeriodicImpulse(fire_interval=300, apply_windowing=False)
               | "ApplyGlobalWindow" >> WindowInto(window.GlobalWindows(),
                                           trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
                                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
               | "To BQ Request" >> ParDo(to_bqrequest, sql=sql)
               | ReadAllFromBigQuery()
            )

final = (pubsub | "Merge" >> ParDo(merge, side=beam.pvalue.AsList(side_pcol))
                | Map(logging.info)
        )                    
    
p.run()

请注意,这使用了 GlobalWindow(因此两个输入具有相同的 window)。我使用了处理时间触发器,以便窗格包含多行。 5 是任意选择的,使用 1 也可以。

请注意,匹配侧输入和主输入之间的数据是不确定的,您可能会看到较旧的发射窗格的值波动。

理论上,使用 FixedWindows 应该可以解决这个问题,但我无法让 FixedWindows 工作。