Flink rocksdb 压实过滤器不工作
Flink rocksdb compaction filter not working
我有一个 Flink 集群。我启用了压缩过滤器并使用状态 TTL。但是 Rocksdb 压缩过滤器不会从内存中释放状态。
我的 Flink Pipeline 中有大约 300 条记录/秒
我的状态 TTL 配置:
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<ObjectNode>(
"my-state",
TypeInformation.of(new TypeHint<ObjectNode>() {})
);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(600))
.cleanupInRocksdbCompactFilter(2)
.build();
descriptor.enableTimeToLive(ttlConfig);
myState = getRuntimeContext().getListState(descriptor);
}
flink-conf.yaml:
state.backend: rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.block.blocksize: 16kb
state.backend.rocksdb.compaction.level.use-dynamic-size: true
state.backend.rocksdb.thread.num: 4
state.checkpoints.dir: file:///opt/flink/checkpoint
state.backend.rocksdb.timer-service.factory: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 2
state.backend.local-recovery: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
jobmanager.execution.failover-strategy: region
rest.port: 8081
state.backend.rocksdb.memory.managed: true
# state.backend.rocksdb.memory.fixed-per-slot: 20mb
state.backend.rocksdb.memory.write-buffer-ratio: 0.9
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
taskmanager.memory.managed.fraction: 0.6
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 500mb
taskmanager.memory.network.max: 700mb
taskmanager.memory.process.size: 5500mb
taskmanager.memory.task.off-heap.size: 800mb
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: ####
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: ####
metrics.reporter.influxdb.username: ####
metrics.reporter.influxdb.password: ####
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
Influxdb 和 Grafana 监控:
正如这个 TTL 清理的名称所暗示的那样 (cleanupInRocksdbCompactFilter),它依赖于仅在压缩期间运行的自定义 RocksDB 压缩过滤器。 docs.
中有更多详细信息
屏幕截图中的指标显示一直没有 运行 压缩。我想此时数据的大小还不足以开始任何压缩。
Compaction Filter does not free states from memory.
我假设主RAM内存是说'from memory'的意思。如果是这样,压实根本就没有运行。 RocksDB 在主内存中保存的数据大小始终是有限的。它基本上是一个缓存,过期的未触及状态最终应该被逐出。随着时间的推移,其余部分会定期溢出到磁盘和 gets compacted。这是此 TTL 清理应该从系统中删除过期状态的时候。
我有一个 Flink 集群。我启用了压缩过滤器并使用状态 TTL。但是 Rocksdb 压缩过滤器不会从内存中释放状态。
我的 Flink Pipeline 中有大约 300 条记录/秒
我的状态 TTL 配置:
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<ObjectNode>(
"my-state",
TypeInformation.of(new TypeHint<ObjectNode>() {})
);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(600))
.cleanupInRocksdbCompactFilter(2)
.build();
descriptor.enableTimeToLive(ttlConfig);
myState = getRuntimeContext().getListState(descriptor);
}
flink-conf.yaml:
state.backend: rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.block.blocksize: 16kb
state.backend.rocksdb.compaction.level.use-dynamic-size: true
state.backend.rocksdb.thread.num: 4
state.checkpoints.dir: file:///opt/flink/checkpoint
state.backend.rocksdb.timer-service.factory: rocksdb
state.backend.rocksdb.checkpoint.transfer.thread.num: 2
state.backend.local-recovery: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
jobmanager.execution.failover-strategy: region
rest.port: 8081
state.backend.rocksdb.memory.managed: true
# state.backend.rocksdb.memory.fixed-per-slot: 20mb
state.backend.rocksdb.memory.write-buffer-ratio: 0.9
state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
taskmanager.memory.managed.fraction: 0.6
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 500mb
taskmanager.memory.network.max: 700mb
taskmanager.memory.process.size: 5500mb
taskmanager.memory.task.off-heap.size: 800mb
metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: ####
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: ####
metrics.reporter.influxdb.username: ####
metrics.reporter.influxdb.password: ####
metrics.reporter.influxdb.consistency: ANY
metrics.reporter.influxdb.connectTimeout: 60000
metrics.reporter.influxdb.writeTimeout: 60000
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
Influxdb 和 Grafana 监控:
正如这个 TTL 清理的名称所暗示的那样 (cleanupInRocksdbCompactFilter),它依赖于仅在压缩期间运行的自定义 RocksDB 压缩过滤器。 docs.
中有更多详细信息屏幕截图中的指标显示一直没有 运行 压缩。我想此时数据的大小还不足以开始任何压缩。
Compaction Filter does not free states from memory.
我假设主RAM内存是说'from memory'的意思。如果是这样,压实根本就没有运行。 RocksDB 在主内存中保存的数据大小始终是有限的。它基本上是一个缓存,过期的未触及状态最终应该被逐出。随着时间的推移,其余部分会定期溢出到磁盘和 gets compacted。这是此 TTL 清理应该从系统中删除过期状态的时候。