有时在数据流运行器中 运行 管道时出现 IllegalStateException
Sometime getting IllegalStateException while running pipeline in dataflow runner
(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)
预期的原因是什么?
WindowFn 代码很简单:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
GroupByKey
的默认行为是输出带有时间戳的迭代器,该时间戳是 window 中允许的最大时间戳。对于您的 window,这是时间戳 13:53:08.718Z
。
元素的时间戳 13:53:08.725Z
不属于从 13:53:08.088Z
到 13:53:08.719Z
的 window。
您能否分享您的 WindowFn
以及任何 ParDo
调整时间戳的信息?
更新: 感谢您分享您的 WindowFn
。有几件事会给您带来麻烦。
1.分配的开始时间 window 不是基于元素的时间戳。
你提取元素的一列并根据context.element().get(BQConstants.LOG_TIME)
的值分配window(忽略转换和解析)。从您的错误消息来看,这似乎不是 context.timestamp()
的实际值,它是元素的事件时间戳。
相反,您应该编写 WindowFn
以使用 context.timestamp()
。您可以根据数据是否有界以不同的方式确保时间戳是您想要的:
- 如果您的数据是有界的,您可以使用
WithTimestamps
通过提取该字段来分配时间戳。
- 如果你的数据是无界的,源需要知道更多才能管理水印,所以配置取决于源。例如,
PubsubIO
从您指定的属性中读取时间戳。
2。分配的结束时间window以系统日期
为准
几个问题:
- 结束时间向下舍入,可能早于开始时间,导致无效window。
- 结束时间不确定。 Beam 中的一般期望是,您将主要根据元素的时间戳(必须落在 window 结束之前)然后根据元素本身确定性地分配 window。像这样分配一个不确定的 window 可能有无法预料的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对存档数据进行 运行 实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更具前瞻性的东西。
这里的目标是什么?您设置它只是为了提取动态目标的端点吗?如果是这样,我建议根据数据发生的时间而不是处理的时间对数据进行分区。
(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)
预期的原因是什么?
WindowFn 代码很简单:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
GroupByKey
的默认行为是输出带有时间戳的迭代器,该时间戳是 window 中允许的最大时间戳。对于您的 window,这是时间戳 13:53:08.718Z
。
元素的时间戳 13:53:08.725Z
不属于从 13:53:08.088Z
到 13:53:08.719Z
的 window。
您能否分享您的 WindowFn
以及任何 ParDo
调整时间戳的信息?
更新: 感谢您分享您的 WindowFn
。有几件事会给您带来麻烦。
1.分配的开始时间 window 不是基于元素的时间戳。
你提取元素的一列并根据context.element().get(BQConstants.LOG_TIME)
的值分配window(忽略转换和解析)。从您的错误消息来看,这似乎不是 context.timestamp()
的实际值,它是元素的事件时间戳。
相反,您应该编写 WindowFn
以使用 context.timestamp()
。您可以根据数据是否有界以不同的方式确保时间戳是您想要的:
- 如果您的数据是有界的,您可以使用
WithTimestamps
通过提取该字段来分配时间戳。 - 如果你的数据是无界的,源需要知道更多才能管理水印,所以配置取决于源。例如,
PubsubIO
从您指定的属性中读取时间戳。
2。分配的结束时间window以系统日期
为准几个问题:
- 结束时间向下舍入,可能早于开始时间,导致无效window。
- 结束时间不确定。 Beam 中的一般期望是,您将主要根据元素的时间戳(必须落在 window 结束之前)然后根据元素本身确定性地分配 window。像这样分配一个不确定的 window 可能有无法预料的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对存档数据进行 运行 实验,这可能会带来麻烦。这取决于您的用例,但您可能会考虑一些更具前瞻性的东西。
这里的目标是什么?您设置它只是为了提取动态目标的端点吗?如果是这样,我建议根据数据发生的时间而不是处理的时间对数据进行分区。