Flink 把 Timers 和 State ttl 存储在哪里?

Where does Flink store Timers and State ttl?

我正在使用 Flink v1.13.2

许多进程函数使用registerProcessingTimeTimer清除状态:

public class ProcessA ...
{
@Override
    public void processElement(Object value, Context ctx, Collector<...> out) throws Exception
    {
       
        if (...)
        {
            ctx.timerService().registerProcessingTimeTimer(value.getTimestampMs() + 23232);
        }
    }

@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out)
    {
        state.clear();
    }
}

许多过程函数使用StateTtlConfig:


public class ProcessB extends...
{

    @Override
    public void open(Configuration parameters)
    {

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(15))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ValueStateDescriptor descriptor = ...
        descriptor.enableTimeToLive(ttlConfig);


    }

    @Override
    public void processElement(...) throws Exception
    {
    
    }

}

我正在使用 RocksDB 进行状态管理。

问题:

timerService创建的定时器会存放在哪里? (存储在RocksDB或任务内存中)

默认情况下,在 RocksDB 中。您还可以选择将计时器保留在堆上,但除非数量很少,否则这是一个坏主意,因为检查点基于堆的计时器会阻塞主流处理线程,并且它们会增加垃圾收集器的压力。

statettl config 创建的状态生存时间将存储在哪里?

这将为每个状态项添加一个 long(在状态后端,因此在 RocksDB 中)。

当我使用timerService或者statettl的时候,有没有存入内存的东西?

如果您将 RocksDB 同时用于状态和计时器,则不会。

如果我有数百万个密钥,我应该选择哪种方式?

将你的计时器保存在 RocksDB 中。

使用timerService时创建数百万个键会导致内存不足异常? 当我使用 statettl?

时,创建数百万个密钥会导致内存不足异常

无论您在其中存储什么,RocksDB 总是可能出现内存不足的异常;本机库在分配给它的内存中并不总是表现良好。但它不应该以无限制的方式增长,你对定时器和状态 TTL 所做的这些选择不应该有任何区别。

在 Flink 1.14 中进行了改进(通过升级到更新版本的 RocksDB),但仍然存在一些问题。在最坏的情况下,您可能需要将 OS 中的实际进程内存限制设置为大于您告诉 Flink 它可以使用的限制。