数据流作业在 GroupByKey() 之后不发出消息

Dataflow job doesn't emit messages after GroupByKey()

我有一个写入 BQ 的流数据流管道,我想 window 所有失败的行并做一些进一步的分析。管道看起来像这样,我在第 2 步收到所有错误消息,但所有消息都卡在 beam.GroupByKey() 上。之后没有任何东西向下游移动。有人知道如何解决这个问题吗?

data = (
            | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription,
                                                               with_attributes=True)
            ...
            | "write to BQ" >> beam.io.WriteToBigQuery(
                table=f"{options.bq_dataset}.{options.bq_table}",
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                method='STREAMING_INSERTS',
                insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
            )
        )

(
   data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
                | f"Window  into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
                | f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset, 'table'))
)

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its publish timestamp.
            | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
            # If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

此外,我不知道它是否相关,但我的 GroupWindowsIntoBatches 中的 beam.DoFn.TimestampParam 具有无效时间戳(负)

好的,问题是来自 BigQuery FAILED_ROWS 的消息没有时间戳。添加 | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time())) 似乎修复了分组依据。

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time())) <----- Added This line
            | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )