如何查看保存点的恢复值?
How to see restored value from savepoints?
我尝试记录有关使用保存点启动 Flink 应用程序时恢复的状态的信息。
我看到该应用程序已正确使用保存点启动。但是我无法记录已恢复的状态值。
该应用计算购买了多少产品。
我可以这样使用CheckpointedFunction吗?
我看到 context.isRestored() return 为真,但 checkpointedState 始终为空。我哪里误解了什么?
public class ProductStatisticFunction extends RichFlatMapFunction<User, ProductStatistic> implements CheckpointedFunction {
private transient ValueState<ProductStatistic> statisticsValueState;
private transient ListState<ProductStatistic> checkpointedState;
@Override
public void flatMap(User user, Collector<ProductStatistic> out) throws Exception {
ProductStatistic currentValue = statisticsValueState.value();
if (currentValue.getAge() == 0) {
currentValue.setAge(user.getAge());
}
currentValue.setAmount(currentValue.getAmount() + 1);
statisticsValueState.update(currentValue);
out.collect(new ProductStatistic(currentValue.getId, currentValue.getAge(), currentValue.getAmount()));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<ProductStatistic> descriptor =
new ValueStateDescriptor<>("age-statistic", TypeInformation.of(new TypeHint<>() {
}),
new ProductStatistic());
statisticsValueState = getRuntimeContext().getState(descriptor);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.checkpointedState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("restored", ProductStatistic.class));
if (context.isRestored()) {
for (ProductStatistic entry : this.checkpointedState.get()) {
log.warn("...."));
}
}
}
}
恐怕这不是它的工作原理,并且没有直接的方法来查看正在恢复到 age-statistic 的 key/value 对值状态。
但是,您可以使用状态处理器 API 检查保存点中存储的状态。或者您可以将 ProductStatisticFunction
更改为 KeyedBroadcastProcessFunction
并使用 applyToKeyedState
在作业为 运行.
时随时检查实时状态值
我尝试记录有关使用保存点启动 Flink 应用程序时恢复的状态的信息。 我看到该应用程序已正确使用保存点启动。但是我无法记录已恢复的状态值。
该应用计算购买了多少产品。
我可以这样使用CheckpointedFunction吗? 我看到 context.isRestored() return 为真,但 checkpointedState 始终为空。我哪里误解了什么?
public class ProductStatisticFunction extends RichFlatMapFunction<User, ProductStatistic> implements CheckpointedFunction {
private transient ValueState<ProductStatistic> statisticsValueState;
private transient ListState<ProductStatistic> checkpointedState;
@Override
public void flatMap(User user, Collector<ProductStatistic> out) throws Exception {
ProductStatistic currentValue = statisticsValueState.value();
if (currentValue.getAge() == 0) {
currentValue.setAge(user.getAge());
}
currentValue.setAmount(currentValue.getAmount() + 1);
statisticsValueState.update(currentValue);
out.collect(new ProductStatistic(currentValue.getId, currentValue.getAge(), currentValue.getAmount()));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<ProductStatistic> descriptor =
new ValueStateDescriptor<>("age-statistic", TypeInformation.of(new TypeHint<>() {
}),
new ProductStatistic());
statisticsValueState = getRuntimeContext().getState(descriptor);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.checkpointedState = context.getKeyedStateStore().getListState(new ListStateDescriptor<>("restored", ProductStatistic.class));
if (context.isRestored()) {
for (ProductStatistic entry : this.checkpointedState.get()) {
log.warn("...."));
}
}
}
}
恐怕这不是它的工作原理,并且没有直接的方法来查看正在恢复到 age-statistic 的 key/value 对值状态。
但是,您可以使用状态处理器 API 检查保存点中存储的状态。或者您可以将 ProductStatisticFunction
更改为 KeyedBroadcastProcessFunction
并使用 applyToKeyedState
在作业为 运行.