apache beam python sdk 是否可以进行状态处理?
Is stateful processing possible with the apache beam python sdk?
我一直在关注 Timely (and Stateful) Processing with Apache Beam 文章,虽然内容全面且写得很好,但它没有具体说明如何使用 python 实现同样的效果。进一步来说
它指出:
State and timers are not yet supported in Beam's Python SDK.
虽然它没有说明这样做的原因......这是不可能的先天原因吗?
我希望为我打算实现的信号处理系统实现重播缓冲区/windowing 系统。由此,长度为 W 的特征的滑动 window / 历史帧缓冲区不断更新为最新的 window。
在 Java 中,它的实现如下所示:
static class FeatureFrameBuffer 扩展了 DoFn,FeatureFrame> {
整数缓冲区大小;
public FeatureFrameBuffer(Integer bufferSize) {
this.bufferSize = bufferSize;
}
@StateId("buffer")
private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<KV<String, Double>> bufferState,
@StateId("count") ValueState<Integer> countState
) {
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element());
// Only output buffer if count is greater than bufferSize
// Remove last element from buffer if count
// greater than or equals buferSize
if (count >= bufferSize) {
bufferState.read();
createFeatureFrame();
context.output(featureFrame);
bufferState.clear();
countState.clear();
}
}
}
在我开始开发自定义实现之前,我想知道是否可以使用 python sdk 实现同样的效果。关于此事的一些建议会很好。
截至今天,Python SDK 对状态处理的支持仍然是一个悬而未决的问题。请参阅 https://issues.apache.org/jira/browse/BEAM-2687, and it is blocked by this 工单:"Implement Beam Python User State and Timer API",但正在积极进行中。
从 Beam 版本 2.9.0 开始,用户状态和计时器可用。不过文档尚未更新。
我一直在关注 Timely (and Stateful) Processing with Apache Beam 文章,虽然内容全面且写得很好,但它没有具体说明如何使用 python 实现同样的效果。进一步来说 它指出:
State and timers are not yet supported in Beam's Python SDK.
虽然它没有说明这样做的原因......这是不可能的先天原因吗?
我希望为我打算实现的信号处理系统实现重播缓冲区/windowing 系统。由此,长度为 W 的特征的滑动 window / 历史帧缓冲区不断更新为最新的 window。
在 Java 中,它的实现如下所示:
static class FeatureFrameBuffer 扩展了 DoFn,FeatureFrame> { 整数缓冲区大小;
public FeatureFrameBuffer(Integer bufferSize) {
this.bufferSize = bufferSize;
}
@StateId("buffer")
private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<KV<String, Double>> bufferState,
@StateId("count") ValueState<Integer> countState
) {
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element());
// Only output buffer if count is greater than bufferSize
// Remove last element from buffer if count
// greater than or equals buferSize
if (count >= bufferSize) {
bufferState.read();
createFeatureFrame();
context.output(featureFrame);
bufferState.clear();
countState.clear();
}
}
}
在我开始开发自定义实现之前,我想知道是否可以使用 python sdk 实现同样的效果。关于此事的一些建议会很好。
截至今天,Python SDK 对状态处理的支持仍然是一个悬而未决的问题。请参阅 https://issues.apache.org/jira/browse/BEAM-2687, and it is blocked by this 工单:"Implement Beam Python User State and Timer API",但正在积极进行中。
从 Beam 版本 2.9.0 开始,用户状态和计时器可用。不过文档尚未更新。