Flink - 如何解决过滤器功能中的背压?

Flink - How to resolve backpressure in Filter Function?

使用 rocksdb 的 flink v1.13.2,我在 grafana 中使用以下命令测量任务压力:sum by (task_name)(flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job="..."})

一些过滤器功能受到背压的影响(ProcessFilter_A、ProcessFilter_B、C、D)。但是以下流程功能运行良好(没有背压 - 流程和 mongodb sink-)。而且我不知道如何解决过滤器功能中的背压

可能是什么原因?

而且我想知道因为我在使用 rocksdb 聚合函数中的 KafkaObject 会存储在 rocksdb 中吗?

这是我的过滤器函数(遭受背压):

private void myStream(DataStream<KafkaObject> kafkaSource)
    {
       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )
                .name("ProcessFilter_A")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(2), Time.minutes(30)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup A"))
                .name("ProcessA")
                .addSink(new MongoDbSink())
                ;

       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )            
                .name("ProcessFilter_B")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(1), Time.minutes(20)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup B"))
                .name("ProcessB")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )           
                .name("ProcessFilter_C")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(30), Time.minutes(10)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup C"))
                .name("ProcessC")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )              
                .name("ProcessFilter_D")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup D"))
                .name("ProcessD")
                .addSink(new MongoDbSink())
                ;
    }

这里是聚合函数:

public class MyAggregateCounter implements AggregateFunction<KafkaObject, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>
{
    @Override
    public Tuple3<Long, Long, Long> createAccumulator()
    {
        return Tuple3.of(0L, 0L, 0L);
    }

    @Override
    public Tuple3<Long, Long, Long> add(KafkaObject value, Tuple3<Long, Long, Long> accumulator)
    {
        if (...)
        {
            accumulator.f0 += 1;
        }
        else if (...)
        {
            accumulator.f1 += 1;
        }
        else if (...)
        {
            accumulator.f2 += 1;
        }
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator)
    {
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> a, Tuple3<Long, Long, Long> b)
    {
        return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2);
    }
}

更新: 大卫是对的,问题是 mongodb。即使我无法将过滤器背压与 mongodb 联系起来(因为没有背压 mongodb sink 本身),在使用 NoSink 实现删除 mongoSink 后一切正常。

在 sink 函数的 open 方法中,我正在创建 N com.mongodb.MongoClient 个实例,其中 N 是并行度。现在我将 N 个实例更改为 每个任务管理器一个实例(使用单一模式)

现在即使看起来还可以,我相信 Flink 也需要预定义的 mongoSink(如 Kafka)。而且 Flink 还需要一些其他指标来找出真正的原因。(因为我永远不会发现问题是 MongoDB)

MongoDB 接收器是您在过滤器函数中观察到的背压的最可能原因。您可以通过用废弃的水槽替换水槽并检查是否消除背压来验证这一点。

有关示例,请参阅

至于降低背压,你可以使用 Flink 的 built-in backpressure monitoring and flame graphs 来确定瓶颈。

如果这样做,您可能会发现 MongoDB and/or Flink 需要扩展。您可能还会发现您的作业大部分时间都在执行 serialization/deserialization -- 优化序列化是提高吞吐量的最佳方法之一。