Flink 把 Timers 和 State ttl 存储在哪里?
Where does Flink store Timers and State ttl?
我正在使用 Flink v1.13.2
许多进程函数使用registerProcessingTimeTimer
清除状态:
public class ProcessA ...
{
@Override
public void processElement(Object value, Context ctx, Collector<...> out) throws Exception
{
if (...)
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestampMs() + 23232);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out)
{
state.clear();
}
}
许多过程函数使用StateTtlConfig
:
public class ProcessB extends...
{
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor descriptor = ...
descriptor.enableTimeToLive(ttlConfig);
}
@Override
public void processElement(...) throws Exception
{
}
}
我正在使用 RocksDB 进行状态管理。
问题:
- timerService创建的定时器会存放在哪里? (存储在RocksDB或任务内存中)
- 由 statettl 配置创建的状态生存时间将存储在哪里?
- 当我使用 timerService 或 statettl 时,是否有任何内容保存到内存中?
- 如果我有数百万个密钥,我应该选择哪种方式?
- 当我使用 timerService 时,创建数百万个键会导致内存不足异常吗?
- 当我使用 statettl 时,创建数百万个键可能会导致内存不足异常
timerService创建的定时器会存放在哪里? (存储在RocksDB或任务内存中)
默认情况下,在 RocksDB 中。您还可以选择将计时器保留在堆上,但除非数量很少,否则这是一个坏主意,因为检查点基于堆的计时器会阻塞主流处理线程,并且它们会增加垃圾收集器的压力。
statettl config 创建的状态生存时间将存储在哪里?
这将为每个状态项添加一个 long(在状态后端,因此在 RocksDB 中)。
当我使用timerService或者statettl的时候,有没有存入内存的东西?
如果您将 RocksDB 同时用于状态和计时器,则不会。
如果我有数百万个密钥,我应该选择哪种方式?
将你的计时器保存在 RocksDB 中。
使用timerService时创建数百万个键会导致内存不足异常?
当我使用 statettl?
时,创建数百万个密钥会导致内存不足异常
无论您在其中存储什么,RocksDB 总是可能出现内存不足的异常;本机库在分配给它的内存中并不总是表现良好。但它不应该以无限制的方式增长,你对定时器和状态 TTL 所做的这些选择不应该有任何区别。
在 Flink 1.14 中进行了改进(通过升级到更新版本的 RocksDB),但仍然存在一些问题。在最坏的情况下,您可能需要将 OS 中的实际进程内存限制设置为大于您告诉 Flink 它可以使用的限制。
我正在使用 Flink v1.13.2
许多进程函数使用registerProcessingTimeTimer
清除状态:
public class ProcessA ...
{
@Override
public void processElement(Object value, Context ctx, Collector<...> out) throws Exception
{
if (...)
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestampMs() + 23232);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out)
{
state.clear();
}
}
许多过程函数使用StateTtlConfig
:
public class ProcessB extends...
{
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor descriptor = ...
descriptor.enableTimeToLive(ttlConfig);
}
@Override
public void processElement(...) throws Exception
{
}
}
我正在使用 RocksDB 进行状态管理。
问题:
- timerService创建的定时器会存放在哪里? (存储在RocksDB或任务内存中)
- 由 statettl 配置创建的状态生存时间将存储在哪里?
- 当我使用 timerService 或 statettl 时,是否有任何内容保存到内存中?
- 如果我有数百万个密钥,我应该选择哪种方式?
- 当我使用 timerService 时,创建数百万个键会导致内存不足异常吗?
- 当我使用 statettl 时,创建数百万个键可能会导致内存不足异常
timerService创建的定时器会存放在哪里? (存储在RocksDB或任务内存中)
默认情况下,在 RocksDB 中。您还可以选择将计时器保留在堆上,但除非数量很少,否则这是一个坏主意,因为检查点基于堆的计时器会阻塞主流处理线程,并且它们会增加垃圾收集器的压力。
statettl config 创建的状态生存时间将存储在哪里?
这将为每个状态项添加一个 long(在状态后端,因此在 RocksDB 中)。
当我使用timerService或者statettl的时候,有没有存入内存的东西?
如果您将 RocksDB 同时用于状态和计时器,则不会。
如果我有数百万个密钥,我应该选择哪种方式?
将你的计时器保存在 RocksDB 中。
使用timerService时创建数百万个键会导致内存不足异常? 当我使用 statettl?
时,创建数百万个密钥会导致内存不足异常无论您在其中存储什么,RocksDB 总是可能出现内存不足的异常;本机库在分配给它的内存中并不总是表现良好。但它不应该以无限制的方式增长,你对定时器和状态 TTL 所做的这些选择不应该有任何区别。
在 Flink 1.14 中进行了改进(通过升级到更新版本的 RocksDB),但仍然存在一些问题。在最坏的情况下,您可能需要将 OS 中的实际进程内存限制设置为大于您告诉 Flink 它可以使用的限制。