数据流作业在 GroupByKey() 之后不发出消息
Dataflow job doesn't emit messages after GroupByKey()
我有一个写入 BQ 的流数据流管道,我想 window 所有失败的行并做一些进一步的分析。管道看起来像这样,我在第 2 步收到所有错误消息,但所有消息都卡在 beam.GroupByKey()
上。之后没有任何东西向下游移动。有人知道如何解决这个问题吗?
data = (
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription,
with_attributes=True)
...
| "write to BQ" >> beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)
)
(
data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset, 'table'))
)
和
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its publish timestamp.
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
# If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)
此外,我不知道它是否相关,但我的 GroupWindowsIntoBatches
中的 beam.DoFn.TimestampParam
具有无效时间戳(负)
好的,问题是来自 BigQuery FAILED_ROWS 的消息没有时间戳。添加 | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
似乎修复了分组依据。
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time())) <----- Added This line
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)
我有一个写入 BQ 的流数据流管道,我想 window 所有失败的行并做一些进一步的分析。管道看起来像这样,我在第 2 步收到所有错误消息,但所有消息都卡在 beam.GroupByKey()
上。之后没有任何东西向下游移动。有人知道如何解决这个问题吗?
data = (
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription,
with_attributes=True)
...
| "write to BQ" >> beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)
)
(
data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset, 'table'))
)
和
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its publish timestamp.
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
# If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)
此外,我不知道它是否相关,但我的 GroupWindowsIntoBatches
中的 beam.DoFn.TimestampParam
具有无效时间戳(负)
好的,问题是来自 BigQuery FAILED_ROWS 的消息没有时间戳。添加 | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
似乎修复了分组依据。
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time())) <----- Added This line
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)