Flink co group outer join 因高背压而失败

Flink co group outer join fails with High Backpressure

我在 Flink 中有两个流 stream1 每秒有 70000 条记录并且 stream2 可能有也可能没有数据。

// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
    environment
        .addSource(createHFAConsumer())
        .name("hfa source");

SingleOutputStreamOperator<EVWindow> stream2 = environment
        .addSource(createHFDConsumer())
        .name("hfd source");
    
DataStream<Message> pStream =
        stream1
        .coGroup(stream2)
        .where(obj -> obj.getid())
        .equalTo(ev -> ev.getid())
            .window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
            .evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
        .apply(new CalculateCoGroupFunction());

当两个 Streams 都有数据时,这工作得很好,但是当 stream2 没有数据时,作业失败,背压非常高。 CPU 利用率也飙升了 200%。

在这种情况下如何处理外连接

我认为问题在于空闲流中缺少水印阻碍了整体水印。每当连接多个流时,生成的水印是传入水印的最小值。这可能会导致您遇到的问题。

您有两个选择:

  1. stream2 的水印设置为 Watermark.MAX_WATERMARK,从而让 stream1 完全控制水印。
  2. 以某种方式检测到 stream2 处于空闲状态,并在没有事件的情况下人为地推进水印。这里是 an example.

感谢David Anderson指点

RCA :

主要问题出现在我尝试围绕我的流创建 Tumbling Window 时。

根据Flink Documentation

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness

由于 stream2 没有传入数据,因此 window 从未实现。正如大卫指出的那样

Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks

这意味着 flink 在等待 stream2 时正在缓冲来自 stream1 的数据,最终会导致高背压并最终导致 OOM。

解决方案 :

我创建了一个外部脚本来以所需的时间间隔将虚拟心跳消息发送到 Kafka 流 stream2,并在我的应用程序中添加了逻辑以忽略这些消息以进行计算。

这迫使 stream2stream1 推进水印,并且 window 被断章取义地删除了。

如前所述:

Whenever multiple streams are connected, the resulting watermark is the minimum of the incoming watermarks

which means flink was buffering data from stream1 while waiting for stream2 and would eventually result in High Backpressure and finally a OOM.

它适用于 DataStream<T> class returns CoGroupedStreams<T, T2>.

中的 coGroup() 方法

为了避免这种行为,我们可以使用 union(DataStream<T>... streams) 方法,该方法 returns 一个简单的 DataStream<T> 水印将像在通常的流中一样前进。

我们需要解决的唯一问题是为两个流设置一个通用模式 (class)。我们可以对两个字段使用一些聚合 class:

public class Aggregator {

  private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
  private EVWindow evWindow;

  public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
    this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
  }

  public Aggregator(EVWindow evWindow) {
    this.evWindow = evWindow;
  }

  public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
    return flatHighFrequencyAnalog;
  }

  public EVWindow getEVWindow() {
    return evWindow;
  }
}

此外,更通用的方法是使用 Either<L, R> class from org.apache.flink.types.

让我们总结一下最后的内容:

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
    environment
        .addSource(createHFAConsumer())
        .map(hfa -> Either.Left(hfa));

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 = 
    environment
        .addSource(createHFDConsumer())
        .map(hfd -> Either.Right(hfd));
    
DataStream<Message> pStream =
        stream1
          .union(stream2)
          .assignTimestampsAndWatermarks(
              WatermarkStrategy
                  .<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
                    ofSeconds(MAX_OUT_OF_ORDERNESS))
                .withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
          .keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
          .window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
          .process(new ProcessWindowFunction());

在处理函数中获取不同的集合

List<EVWindow> evWindows =
        Streams.stream(elements)
            .filter(Either::isLeft)
            .map(Either::left)
            .collect(Collectors.toList());

List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
        Streams.stream(elements)
            .filter(Either::isRight)
            .map(Either::right)
            .collect(Collectors.toList());