有时在数据流运行器中 运行 管道时出现 IllegalStateException

Sometime getting IllegalStateException while running pipeline in dataflow runner

(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)

预期的原因是什么?

WindowFn 代码很简单:

public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private IntervalWindow assignWindow(AssignContext context) {
    TableRow tableRow = (TableRow) context.element();
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
    String currentTime = DateUtil.getFormatedDate(new Date());
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
            .withZoneUTC();
    Instant start_point = Instant.parse(timestamp, formatter);
    Instant end_point = Instant.parse(currentTime, formatter);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new IllegalArgumentException(
            "Attempted to get side input window for GlobalWindow from non-global WindowFn");
}

}

GroupByKey 的默认行为是输出带有时间戳的迭代器,该时间戳是 window 中允许的最大时间戳。对于您的 window,这是时间戳 13:53:08.718Z

元素的时间戳 13:53:08.725Z 不属于从 13:53:08.088Z13:53:08.719Z 的 window。

您能否分享您的 WindowFn 以及任何 ParDo 调整时间戳的信息?

更新: 感谢您分享您的 WindowFn。有几件事会给您带来麻烦。

1.分配的开始时间 window 不是基于元素的时间戳。

你提取元素的一列并根据context.element().get(BQConstants.LOG_TIME)的值分配window(忽略转换和解析)。从您的错误消息来看,这似乎不是 context.timestamp() 的实际值,它是元素的事件时间戳。

相反,您应该编写 WindowFn 以使用 context.timestamp()。您可以根据数据是否有界以不同的方式确保时间戳是您想要的:

  • 如果您的数据是有界的,您可以使用 WithTimestamps 通过提取该字段来分配时间戳。
  • 如果你的数据是无界的,源需要知道更多才能管理水印,所以配置取决于源。例如,PubsubIO 从您指定的属性中读取时间戳。

2。分配的结束时间window以系统日期

为准

几个问题:

  • 结束时间向下舍入,可能早于开始时间,导致无效window。
  • 结束时间不确定。 Beam 中的一般期望是,您将主要根据元素的时间戳(必须落在 window 结束之前)然后根据元素本身确定性地分配 window。像这样分配一个不确定的 window 可能有无法预料的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对存档数据进行 运行 实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更具前瞻性的东西。

这里的目标是什么?您设置它只是为了提取动态目标的端点吗?如果是这样,我建议根据数据发生的时间而不是处理的时间对数据进行分区。