Flink 缺少 windows 在某些分区上生成
Flink missing windows generated on some partitions
我正在尝试编写一个小型 Flink 数据流以更多地了解它是如何工作的,但我遇到了一种奇怪的情况,每次我 运行 它时,我都会得到不一致的输出。有时我期待的一些记录丢失了。请记住,这只是我为学习 DataStream 的概念而构建的玩具示例 API。
我有一个大约 7600 行的 CSV 格式数据集,如下所示:
Date,Country,City,Specie,count,min,max,median,variance
28/06/2021,GR,Athens,no2,116,0.5,58.9,5.5,2824.39
28/06/2021,GR,Athens,wind-speed,133,0.1,11.2,3,96.69
28/06/2021,GR,Athens,dew,24,14,20,18,35.92
28/06/2021,GR,Athens,temperature,141,24.4,38.4,30.5,123.18
28/06/2021,GR,Athens,pm25,116,34,85,68,702.29
这里有完整的数据集:https://pastebin.com/rknnRnPc
没有特殊字符或引号,因此简单的字符串拆分就可以了。
每个城市的日期范围从 28/06/2021 到 03/10/2021。
我正在使用 DataStream 阅读它 API:
final DataStream<String> source = env.readTextFile("data.csv");
每一行都映射到一个简单的 POJO,如下所示:
public class CityMetric {
private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy");
private final LocalDate localDate;
private final String country;
private final String city;
private final String reading;
private final int count;
private final double min;
private final double max;
private final double median;
private final double variance;
private CityMetric(LocalDate localDate, String country, String city, String reading, int count, double min, double max, double median, double variance) {
this.localDate = localDate;
this.country = country;
this.city = city;
this.reading = reading;
this.count = count;
this.min = min;
this.max = max;
this.median = median;
this.variance = variance;
}
public static CityMetric fromArray(String[] arr) {
LocalDate date = LocalDate.parse(arr[0], dateFormatter);
int count = Integer.parseInt(arr[4]);
double min = Double.parseDouble(arr[5]);
double max = Double.parseDouble(arr[6]);
double median = Double.parseDouble(arr[7]);
double variance = Double.parseDouble(arr[8]);
return new CityMetric(date, arr[1], arr[2], arr[3], count, min, max, median, variance);
}
public long getTimestamp() {
return getLocalDate()
.atStartOfDay()
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
}
//getters follow
记录都是按日期顺序排列的,所以我有这个来设置事件时间和水印:
final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>forMonotonousTimestamps() //we know they are sorted by time
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());
我在 Tuple4 上有一个 StreamingFileSink
来输出日期范围、城市和平均值:
final StreamingFileSink<Tuple4<LocalDate, LocalDate, String, Double>> fileSink =
StreamingFileSink.forRowFormat(
new Path("airquality"),
new SimpleStringEncoder<Tuple4<LocalDate, LocalDate, String, Double>>("UTF-8"))
.build();
最后我得到如下数据流:
source
.map(s -> s.split(",")) //split the CSV row into its fields
.filter(arr -> !arr[0].startsWith("Date")) // if it starts with Date it means it is the top header
.map(CityMetric::fromArray) //create the object from the fields
.assignTimestampsAndWatermarks(cityMetricWatermarkStrategy) // we use the date as the event time
.filter(cm -> cm.getReading().equals("pm25")) // we want air quality of fine particulate matter pm2.5
.keyBy(CityMetric::getCity) // partition by city name
.window(TumblingEventTimeWindows.of(Time.days(7))) //windows of 7 days
.aggregate(new CityAverageAggregate()) // average the values
.name("cityair")
.addSink(fileSink); //output each partition to a file
CityAverageAggregate
只是累加总和和计数,并跟踪它所覆盖范围的最早和最晚日期。
public class CityAverageAggregate
implements AggregateFunction<
CityMetric, CityAverageAggregate.AverageAccumulator, Tuple4<LocalDate, LocalDate, String, Double>> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(CityMetric cityMetric, AverageAccumulator averageAccumulator) {
return averageAccumulator.add(
cityMetric.getCity(), cityMetric.getLocalDate(), cityMetric.getMedian());
}
@Override
public Tuple4<LocalDate, LocalDate, String, Double> getResult(
AverageAccumulator averageAccumulator) {
return Tuple4.of(
averageAccumulator.getStart(),
averageAccumulator.getEnd(),
averageAccumulator.getCity(),
averageAccumulator.average());
}
@Override
public AverageAccumulator merge(AverageAccumulator acc1, AverageAccumulator acc2) {
return acc1.merge(acc2);
}
public static class AverageAccumulator {
private final String city;
private final LocalDate start;
private final LocalDate end;
private final long count;
private final double sum;
public AverageAccumulator() {
city = "";
count = 0;
sum = 0;
start = null;
end = null;
}
AverageAccumulator(String city, LocalDate start, LocalDate end, long count, double sum) {
this.city = city;
this.count = count;
this.sum = sum;
this.start = start;
this.end = end;
}
public AverageAccumulator add(String city, LocalDate eventDate, double value) {
//make sure our dataflow is correct and we are summing data from the same city
if (!this.city.equals("") && !this.city.equals(city)) {
throw new IllegalArgumentException(city + " does not match " + this.city);
}
return new AverageAccumulator(
city,
earliest(this.start, eventDate),
latest(this.end, eventDate),
this.count + 1,
this.sum + value);
}
public AverageAccumulator merge(AverageAccumulator that) {
LocalDate mergedStart = earliest(this.start, that.start);
LocalDate mergedEnd = latest(this.end, that.end);
return new AverageAccumulator(
this.city, mergedStart, mergedEnd, this.count + that.count, this.sum + that.sum);
}
private LocalDate earliest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isBefore(d2) ? d1 : d2;
}
}
private LocalDate latest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isAfter(d2) ? d1 : d2;
}
}
public double average() {
return sum / count;
}
public String getCity() {
return city;
}
public LocalDate getStart() {
return start;
}
public LocalDate getEnd() {
return end;
}
}
}
问题:
我面临的问题是有时我没有得到我所期待的所有 windows。这并不总是发生,有时连续 运行s 输出不同的结果,所以我怀疑某处存在一些竞争条件。
例如,在其中一个分区文件输出中,我有时得到:
(2021-07-12,2021-07-14,Belgrade,56.666666666666664)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)
虽然有时我会得到全套:
(2021-06-28,2021-06-30,Belgrade,48.666666666666664)
(2021-07-01,2021-07-07,Belgrade,41.142857142857146)
(2021-07-08,2021-07-14,Belgrade,52.857142857142854)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)
我的数据流管道中是否有明显的错误?想不通为什么会这样。它也不总是发生在同一个城市。
可能发生了什么?
更新
看来,当我禁用水印后,问题就不再发生了。我将 WatermarkStrategy 更改为以下内容:
final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>noWatermarks()
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());
到目前为止,我一直在获得一致的结果。当我检查 documentation 时,它说:
static WatermarkStrategy noWatermarks()
Creates a watermark strategy that generates no watermarks at all. This may be useful in scenarios that do pure processing-time based stream processing.
但我不是在做基于处理时间的流处理,我在做事件时间处理。
为什么 forMonotonousTimestamps()
会出现我所看到的奇怪行为?事实上,我的时间戳是单调递增的(如果不是这样,noWatermarks
策略就不会起作用),但是以某种方式改变它并不适用于我的场景。
我对 Flink 的工作方式有什么遗漏吗?
Flink 不支持 per-key 水印。每个并行任务独立生成水印,基于观察流经该任务的所有事件。
所以这不适用于 forMonotonousTimestamps
水印策略的原因是输入实际上不是按时间戳排序的。它在每个城市内按时间排序,但不是全局排序。这将导致一些记录延迟,但这是不可预测的,具体取决于生成水印的确切时间。这些迟到的事件被应该包含它们的 windows 忽略了。
您可以通过多种方式解决此问题:
(1) 使用 forBoundedOutOfOrderness
水印策略,持续时间足以说明数据集中的实际 out-of-order-ness。鉴于数据看起来像这样:
03/10/2021,GR,Athens,pressure,60,1017.9,1040.6,1020.9,542.4
28/06/2021,US,Atlanta,co,24,1.4,7.3,2.2,19.05
这将需要 out-of-order-ness 大约 100 天的持续时间。
(2) 配置 windows 以允许足够的迟到。这将导致某些 windows 被多次触发——当水印指示它们可以关闭时触发一次,并且每次将延迟事件添加到 window.
时再次触发
(3) 使用noWatermarks
策略。这将导致作业仅在到达其输入文件末尾时才产生结果。对于连续的流式传输作业,这是行不通的,但对于有限(有界)输入,这是可行的。
(4) 运行 RuntimeExecutionMode.BATCH
模式下的作业。然后作业只会在消耗完所有输入后才会在最后产生结果。这将 运行 为批处理工作负载设计的具有更优化 运行 时间的作业,但结果应与 (3) 相同。
(5) 更改输入,使其不是 out-of-order。
我正在尝试编写一个小型 Flink 数据流以更多地了解它是如何工作的,但我遇到了一种奇怪的情况,每次我 运行 它时,我都会得到不一致的输出。有时我期待的一些记录丢失了。请记住,这只是我为学习 DataStream 的概念而构建的玩具示例 API。
我有一个大约 7600 行的 CSV 格式数据集,如下所示:
Date,Country,City,Specie,count,min,max,median,variance
28/06/2021,GR,Athens,no2,116,0.5,58.9,5.5,2824.39
28/06/2021,GR,Athens,wind-speed,133,0.1,11.2,3,96.69
28/06/2021,GR,Athens,dew,24,14,20,18,35.92
28/06/2021,GR,Athens,temperature,141,24.4,38.4,30.5,123.18
28/06/2021,GR,Athens,pm25,116,34,85,68,702.29
这里有完整的数据集:https://pastebin.com/rknnRnPc
没有特殊字符或引号,因此简单的字符串拆分就可以了。
每个城市的日期范围从 28/06/2021 到 03/10/2021。
我正在使用 DataStream 阅读它 API:
final DataStream<String> source = env.readTextFile("data.csv");
每一行都映射到一个简单的 POJO,如下所示:
public class CityMetric {
private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy");
private final LocalDate localDate;
private final String country;
private final String city;
private final String reading;
private final int count;
private final double min;
private final double max;
private final double median;
private final double variance;
private CityMetric(LocalDate localDate, String country, String city, String reading, int count, double min, double max, double median, double variance) {
this.localDate = localDate;
this.country = country;
this.city = city;
this.reading = reading;
this.count = count;
this.min = min;
this.max = max;
this.median = median;
this.variance = variance;
}
public static CityMetric fromArray(String[] arr) {
LocalDate date = LocalDate.parse(arr[0], dateFormatter);
int count = Integer.parseInt(arr[4]);
double min = Double.parseDouble(arr[5]);
double max = Double.parseDouble(arr[6]);
double median = Double.parseDouble(arr[7]);
double variance = Double.parseDouble(arr[8]);
return new CityMetric(date, arr[1], arr[2], arr[3], count, min, max, median, variance);
}
public long getTimestamp() {
return getLocalDate()
.atStartOfDay()
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
}
//getters follow
记录都是按日期顺序排列的,所以我有这个来设置事件时间和水印:
final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>forMonotonousTimestamps() //we know they are sorted by time
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());
我在 Tuple4 上有一个 StreamingFileSink
来输出日期范围、城市和平均值:
final StreamingFileSink<Tuple4<LocalDate, LocalDate, String, Double>> fileSink =
StreamingFileSink.forRowFormat(
new Path("airquality"),
new SimpleStringEncoder<Tuple4<LocalDate, LocalDate, String, Double>>("UTF-8"))
.build();
最后我得到如下数据流:
source
.map(s -> s.split(",")) //split the CSV row into its fields
.filter(arr -> !arr[0].startsWith("Date")) // if it starts with Date it means it is the top header
.map(CityMetric::fromArray) //create the object from the fields
.assignTimestampsAndWatermarks(cityMetricWatermarkStrategy) // we use the date as the event time
.filter(cm -> cm.getReading().equals("pm25")) // we want air quality of fine particulate matter pm2.5
.keyBy(CityMetric::getCity) // partition by city name
.window(TumblingEventTimeWindows.of(Time.days(7))) //windows of 7 days
.aggregate(new CityAverageAggregate()) // average the values
.name("cityair")
.addSink(fileSink); //output each partition to a file
CityAverageAggregate
只是累加总和和计数,并跟踪它所覆盖范围的最早和最晚日期。
public class CityAverageAggregate
implements AggregateFunction<
CityMetric, CityAverageAggregate.AverageAccumulator, Tuple4<LocalDate, LocalDate, String, Double>> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(CityMetric cityMetric, AverageAccumulator averageAccumulator) {
return averageAccumulator.add(
cityMetric.getCity(), cityMetric.getLocalDate(), cityMetric.getMedian());
}
@Override
public Tuple4<LocalDate, LocalDate, String, Double> getResult(
AverageAccumulator averageAccumulator) {
return Tuple4.of(
averageAccumulator.getStart(),
averageAccumulator.getEnd(),
averageAccumulator.getCity(),
averageAccumulator.average());
}
@Override
public AverageAccumulator merge(AverageAccumulator acc1, AverageAccumulator acc2) {
return acc1.merge(acc2);
}
public static class AverageAccumulator {
private final String city;
private final LocalDate start;
private final LocalDate end;
private final long count;
private final double sum;
public AverageAccumulator() {
city = "";
count = 0;
sum = 0;
start = null;
end = null;
}
AverageAccumulator(String city, LocalDate start, LocalDate end, long count, double sum) {
this.city = city;
this.count = count;
this.sum = sum;
this.start = start;
this.end = end;
}
public AverageAccumulator add(String city, LocalDate eventDate, double value) {
//make sure our dataflow is correct and we are summing data from the same city
if (!this.city.equals("") && !this.city.equals(city)) {
throw new IllegalArgumentException(city + " does not match " + this.city);
}
return new AverageAccumulator(
city,
earliest(this.start, eventDate),
latest(this.end, eventDate),
this.count + 1,
this.sum + value);
}
public AverageAccumulator merge(AverageAccumulator that) {
LocalDate mergedStart = earliest(this.start, that.start);
LocalDate mergedEnd = latest(this.end, that.end);
return new AverageAccumulator(
this.city, mergedStart, mergedEnd, this.count + that.count, this.sum + that.sum);
}
private LocalDate earliest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isBefore(d2) ? d1 : d2;
}
}
private LocalDate latest(LocalDate d1, LocalDate d2) {
if (d1 == null) {
return d2;
} else if (d2 == null) {
return d1;
} else {
return d1.isAfter(d2) ? d1 : d2;
}
}
public double average() {
return sum / count;
}
public String getCity() {
return city;
}
public LocalDate getStart() {
return start;
}
public LocalDate getEnd() {
return end;
}
}
}
问题:
我面临的问题是有时我没有得到我所期待的所有 windows。这并不总是发生,有时连续 运行s 输出不同的结果,所以我怀疑某处存在一些竞争条件。
例如,在其中一个分区文件输出中,我有时得到:
(2021-07-12,2021-07-14,Belgrade,56.666666666666664)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)
虽然有时我会得到全套:
(2021-06-28,2021-06-30,Belgrade,48.666666666666664)
(2021-07-01,2021-07-07,Belgrade,41.142857142857146)
(2021-07-08,2021-07-14,Belgrade,52.857142857142854)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)
我的数据流管道中是否有明显的错误?想不通为什么会这样。它也不总是发生在同一个城市。
可能发生了什么?
更新
看来,当我禁用水印后,问题就不再发生了。我将 WatermarkStrategy 更改为以下内容:
final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
WatermarkStrategy.<CityMetric>noWatermarks()
.withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());
到目前为止,我一直在获得一致的结果。当我检查 documentation 时,它说:
static WatermarkStrategy noWatermarks()
Creates a watermark strategy that generates no watermarks at all. This may be useful in scenarios that do pure processing-time based stream processing.
但我不是在做基于处理时间的流处理,我在做事件时间处理。
为什么 forMonotonousTimestamps()
会出现我所看到的奇怪行为?事实上,我的时间戳是单调递增的(如果不是这样,noWatermarks
策略就不会起作用),但是以某种方式改变它并不适用于我的场景。
我对 Flink 的工作方式有什么遗漏吗?
Flink 不支持 per-key 水印。每个并行任务独立生成水印,基于观察流经该任务的所有事件。
所以这不适用于 forMonotonousTimestamps
水印策略的原因是输入实际上不是按时间戳排序的。它在每个城市内按时间排序,但不是全局排序。这将导致一些记录延迟,但这是不可预测的,具体取决于生成水印的确切时间。这些迟到的事件被应该包含它们的 windows 忽略了。
您可以通过多种方式解决此问题:
(1) 使用 forBoundedOutOfOrderness
水印策略,持续时间足以说明数据集中的实际 out-of-order-ness。鉴于数据看起来像这样:
03/10/2021,GR,Athens,pressure,60,1017.9,1040.6,1020.9,542.4
28/06/2021,US,Atlanta,co,24,1.4,7.3,2.2,19.05
这将需要 out-of-order-ness 大约 100 天的持续时间。
(2) 配置 windows 以允许足够的迟到。这将导致某些 windows 被多次触发——当水印指示它们可以关闭时触发一次,并且每次将延迟事件添加到 window.
时再次触发(3) 使用noWatermarks
策略。这将导致作业仅在到达其输入文件末尾时才产生结果。对于连续的流式传输作业,这是行不通的,但对于有限(有界)输入,这是可行的。
(4) 运行 RuntimeExecutionMode.BATCH
模式下的作业。然后作业只会在消耗完所有输入后才会在最后产生结果。这将 运行 为批处理工作负载设计的具有更优化 运行 时间的作业,但结果应与 (3) 相同。
(5) 更改输入,使其不是 out-of-order。