Flink:如何在另一个流中存储状态和使用?
Flink: how to store state and use in another stream?
我有一个 Flink 的用例,我需要从文件中读取信息,存储每一行,然后使用这个状态来过滤另一个流。
我现在可以使用 connect
运算符和 RichCoFlatMapFunction
来完成所有这些工作,但感觉过于复杂。另外,我担心 flatMap2
可能会在从文件加载所有状态之前开始执行:
fileStream
.connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
.keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
.flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
private transient ValueState<String> storedPartId;
@Override
public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
// store state
storedPartId.update(partId);
}
@Override
public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
if (record.getPartId().equals(storedPartId.value())) {
out.collect(record);
} else {
// do nothing
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>(
"partId", // the state name
TypeInformation.of(new TypeHint<String>() {}),
null);
storedPartId = getRuntimeContext().getState(descriptor);
}
});
有没有更好的方法(从 Flink 1.1.3 开始)来完成这种加载状态模式,然后在后续流中使用它?
您对 CoFlatMapFunction
的担忧是正确的。 flatMap1
和 flatMap2
的调用顺序无法控制,取决于数据到达的顺序。因此,可能会在 flatMap1
.
读取所有数据之前调用 flatMap2
Flink 1.1.3 在开始处理流之前读取所有数据的唯一方法是在 RichFlatMapFunction
的 open()
方法中使用数据,即您必须手动读取并解析文件。
这基本上是一种广播连接策略,即运算符的每个并行实例都会这样做。缺点是文件的数据会被复制。好处是您不必随机播放 "main" 流(无需使用 keyBy()
)。
我有一个 Flink 的用例,我需要从文件中读取信息,存储每一行,然后使用这个状态来过滤另一个流。
我现在可以使用 connect
运算符和 RichCoFlatMapFunction
来完成所有这些工作,但感觉过于复杂。另外,我担心 flatMap2
可能会在从文件加载所有状态之前开始执行:
fileStream
.connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
.keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
.flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
private transient ValueState<String> storedPartId;
@Override
public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
// store state
storedPartId.update(partId);
}
@Override
public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
if (record.getPartId().equals(storedPartId.value())) {
out.collect(record);
} else {
// do nothing
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>(
"partId", // the state name
TypeInformation.of(new TypeHint<String>() {}),
null);
storedPartId = getRuntimeContext().getState(descriptor);
}
});
有没有更好的方法(从 Flink 1.1.3 开始)来完成这种加载状态模式,然后在后续流中使用它?
您对 CoFlatMapFunction
的担忧是正确的。 flatMap1
和 flatMap2
的调用顺序无法控制,取决于数据到达的顺序。因此,可能会在 flatMap1
.
flatMap2
Flink 1.1.3 在开始处理流之前读取所有数据的唯一方法是在 RichFlatMapFunction
的 open()
方法中使用数据,即您必须手动读取并解析文件。
这基本上是一种广播连接策略,即运算符的每个并行实例都会这样做。缺点是文件的数据会被复制。好处是您不必随机播放 "main" 流(无需使用 keyBy()
)。