Flink 缺少 windows 在某些分区上生成

Flink missing windows generated on some partitions

我正在尝试编写一个小型 Flink 数据流以更多地了解它是如何工作的,但我遇到了一种奇怪的情况,每次我 运行 它时,我都会得到不一致的输出。有时我期待的一些记录丢失了。请记住,这只是我为学习 DataStream 的概念而构建的玩具示例 API。

我有一个大约 7600 行的 CSV 格式数据集,如下所示:

Date,Country,City,Specie,count,min,max,median,variance
28/06/2021,GR,Athens,no2,116,0.5,58.9,5.5,2824.39
28/06/2021,GR,Athens,wind-speed,133,0.1,11.2,3,96.69
28/06/2021,GR,Athens,dew,24,14,20,18,35.92
28/06/2021,GR,Athens,temperature,141,24.4,38.4,30.5,123.18
28/06/2021,GR,Athens,pm25,116,34,85,68,702.29

这里有完整的数据集:https://pastebin.com/rknnRnPc

没有特殊字符或引号,因此简单的字符串拆分就可以了。

每个城市的日期范围从 28/06/2021 到 03/10/2021。

我正在使用 DataStream 阅读它 API:

final DataStream<String> source = env.readTextFile("data.csv");

每一行都映射到一个简单的 POJO,如下所示:

public class CityMetric {

    private static final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("dd/MM/yyyy");

    private final LocalDate localDate;
    private final String country;
    private final String city;
    private final String reading;
    private final int count;
    private final double min;
    private final double max;
    private final double median;
    private final double variance;

    private CityMetric(LocalDate localDate, String country, String city, String reading, int count, double min, double max, double median, double variance) {
        this.localDate = localDate;
        this.country = country;
        this.city = city;
        this.reading = reading;
        this.count = count;
        this.min = min;
        this.max = max;
        this.median = median;
        this.variance = variance;
    }

    public static CityMetric fromArray(String[] arr) {
        LocalDate date = LocalDate.parse(arr[0], dateFormatter);
        int count = Integer.parseInt(arr[4]);
        double min = Double.parseDouble(arr[5]);
        double max = Double.parseDouble(arr[6]);
        double median = Double.parseDouble(arr[7]);
        double variance = Double.parseDouble(arr[8]);

        return new CityMetric(date, arr[1], arr[2], arr[3], count, min, max, median, variance);
    }

    public long getTimestamp() {
        return getLocalDate()
                .atStartOfDay()
                .toInstant(ZoneOffset.UTC)
                .toEpochMilli();
    }

//getters follow

记录都是按日期顺序排列的,所以我有这个来设置事件时间和水印:

   final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
            WatermarkStrategy.<CityMetric>forMonotonousTimestamps()  //we know they are sorted by time
                    .withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

我在 Tuple4 上有一个 StreamingFileSink 来输出日期范围、城市和平均值:

  final StreamingFileSink<Tuple4<LocalDate, LocalDate, String, Double>> fileSink =
        StreamingFileSink.forRowFormat(
                new Path("airquality"),
                new SimpleStringEncoder<Tuple4<LocalDate, LocalDate, String, Double>>("UTF-8"))
            .build();

最后我得到如下数据流:

 source
        .map(s -> s.split(",")) //split the CSV row into its fields
        .filter(arr -> !arr[0].startsWith("Date")) // if it starts with Date it means it is the top header
        .map(CityMetric::fromArray)  //create the object from the fields
        .assignTimestampsAndWatermarks(cityMetricWatermarkStrategy) // we use the date as the event time
        .filter(cm -> cm.getReading().equals("pm25")) // we want air quality of fine particulate matter pm2.5
        .keyBy(CityMetric::getCity) // partition by city name
        .window(TumblingEventTimeWindows.of(Time.days(7))) //windows of 7 days
        .aggregate(new CityAverageAggregate()) // average the values
        .name("cityair")
        .addSink(fileSink); //output each partition to a file

CityAverageAggregate只是累加总和和计数,并跟踪它所覆盖范围的最早和最晚日期。

public class CityAverageAggregate
    implements AggregateFunction<
        CityMetric, CityAverageAggregate.AverageAccumulator, Tuple4<LocalDate, LocalDate, String, Double>> {

  @Override
  public AverageAccumulator createAccumulator() {
    return new AverageAccumulator();
  }

  @Override
  public AverageAccumulator add(CityMetric cityMetric, AverageAccumulator averageAccumulator) {
    return averageAccumulator.add(
        cityMetric.getCity(), cityMetric.getLocalDate(), cityMetric.getMedian());
  }

  @Override
  public Tuple4<LocalDate, LocalDate, String, Double> getResult(
      AverageAccumulator averageAccumulator) {
    return Tuple4.of(
        averageAccumulator.getStart(),
        averageAccumulator.getEnd(),
        averageAccumulator.getCity(),
        averageAccumulator.average());
  }

  @Override
  public AverageAccumulator merge(AverageAccumulator acc1, AverageAccumulator acc2) {
    return acc1.merge(acc2);
  }

  public static class AverageAccumulator {
    private final String city;
    private final LocalDate start;
    private final LocalDate end;
    private final long count;
    private final double sum;

    public AverageAccumulator() {
      city = "";
      count = 0;
      sum = 0;
      start = null;
      end = null;
    }

    AverageAccumulator(String city, LocalDate start, LocalDate end, long count, double sum) {
      this.city = city;
      this.count = count;
      this.sum = sum;
      this.start = start;
      this.end = end;
    }

    public AverageAccumulator add(String city, LocalDate eventDate, double value) {
      //make sure our dataflow is correct and we are summing data from the same city
      if (!this.city.equals("") && !this.city.equals(city)) {
        throw new IllegalArgumentException(city + " does not match " + this.city);
      }

      return new AverageAccumulator(
          city,
          earliest(this.start, eventDate),
          latest(this.end, eventDate),
          this.count + 1,
          this.sum + value);
    }

    public AverageAccumulator merge(AverageAccumulator that) {
      LocalDate mergedStart = earliest(this.start, that.start);
      LocalDate mergedEnd = latest(this.end, that.end);
      return new AverageAccumulator(
          this.city, mergedStart, mergedEnd, this.count + that.count, this.sum + that.sum);
    }

    private LocalDate earliest(LocalDate d1, LocalDate d2) {
      if (d1 == null) {
        return d2;
      } else if (d2 == null) {
        return d1;
      } else {
        return d1.isBefore(d2) ? d1 : d2;
      }
    }

    private LocalDate latest(LocalDate d1, LocalDate d2) {
      if (d1 == null) {
        return d2;
      } else if (d2 == null) {
        return d1;
      } else {
        return d1.isAfter(d2) ? d1 : d2;
      }
    }

    public double average() {
      return sum / count;
    }

    public String getCity() {
      return city;
    }

    public LocalDate getStart() {
      return start;
    }

    public LocalDate getEnd() {
      return end;
    }
  }
}

问题:

我面临的问题是有时我没有得到我所期待的所有 windows。这并不总是发生,有时连续 运行s 输出不同的结果,所以我怀疑某处存在一些竞争条件。

例如,在其中一个分区文件输出中,我有时得到:

(2021-07-12,2021-07-14,Belgrade,56.666666666666664)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

虽然有时我会得到全套:

(2021-06-28,2021-06-30,Belgrade,48.666666666666664)
(2021-07-01,2021-07-07,Belgrade,41.142857142857146)
(2021-07-08,2021-07-14,Belgrade,52.857142857142854)
(2021-07-15,2021-07-21,Belgrade,56.0)
(2021-07-22,2021-07-28,Belgrade,57.285714285714285)
(2021-07-29,2021-08-04,Belgrade,43.57142857142857)
(2021-08-05,2021-08-11,Belgrade,35.42857142857143)
(2021-08-12,2021-08-18,Belgrade,43.42857142857143)
(2021-08-19,2021-08-25,Belgrade,36.857142857142854)
(2021-08-26,2021-09-01,Belgrade,50.285714285714285)
(2021-09-02,2021-09-08,Belgrade,46.285714285714285)
(2021-09-09,2021-09-15,Belgrade,54.857142857142854)
(2021-09-16,2021-09-22,Belgrade,56.714285714285715)
(2021-09-23,2021-09-29,Belgrade,59.285714285714285)
(2021-09-30,2021-10-03,Belgrade,61.5)

我的数据流管道中是否有明显的错误?想不通为什么会这样。它也不总是发生在同一个城市。

可能发生了什么?

更新

看来,当我禁用水印后,问题就不再发生了。我将 WatermarkStrategy 更改为以下内容:

    final WatermarkStrategy<CityMetric> cityMetricWatermarkStrategy =
            WatermarkStrategy.<CityMetric>noWatermarks()  
                             .withTimestampAssigner((cityMetric, l) -> cityMetric.getTimestamp());

到目前为止,我一直在获得一致的结果。当我检查 documentation 时,它说:

static WatermarkStrategy noWatermarks()

Creates a watermark strategy that generates no watermarks at all. This may be useful in scenarios that do pure processing-time based stream processing.

但我不是在做基于处理时间的流处理,我在做事件时间处理。

为什么 forMonotonousTimestamps() 会出现我所看到的奇怪行为?事实上,我的时间戳是单调递增的(如果不是这样,noWatermarks 策略就不会起作用),但是以某种方式改变它并不适用于我的场景。

我对 Flink 的工作方式有什么遗漏吗?

Flink 不支持 per-key 水印。每个并行任务独立生成水印,基于观察流经该任务的所有事件。

所以这不适用于 forMonotonousTimestamps 水印策略的原因是输入实际上不是按时间戳排序的。它在每个城市内按时间排序,但不是全局排序。这将导致一些记录延迟,但这是不可预测的,具体取决于生成水印的确切时间。这些迟到的事件被应该包含它们的 windows 忽略了。

您可以通过多种方式解决此问题:

(1) 使用 forBoundedOutOfOrderness 水印策略,持续时间足以说明数据集中的实际 out-of-order-ness。鉴于数据看起来像这样:

03/10/2021,GR,Athens,pressure,60,1017.9,1040.6,1020.9,542.4
28/06/2021,US,Atlanta,co,24,1.4,7.3,2.2,19.05

这将需要 out-of-order-ness 大约 100 天的持续时间。

(2) 配置 windows 以允许足够的迟到。这将导致某些 windows 被多次触发——当水印指示它们可以关闭时触发一次,并且每次将延迟事件添加到 window.

时再次触发

(3) 使用noWatermarks策略。这将导致作业仅在到达其输入文件末尾时才产生结果。对于连续的流式传输作业,这是行不通的,但对于有限(有界)输入,这是可行的。

(4) 运行 RuntimeExecutionMode.BATCH 模式下的作业。然后作业只会在消耗完所有输入后才会在最后产生结果。这将 运行 为批处理工作负载设计的具有更优化 运行 时间的作业,但结果应与 (3) 相同。

(5) 更改输入,使其不是 out-of-order。