apache beam python sdk 是否可以进行状态处理?

Is stateful processing possible with the apache beam python sdk?

我一直在关注 Timely (and Stateful) Processing with Apache Beam 文章,虽然内容全面且写得很好,但它没有具体说明如何使用 python 实现同样的效果。进一步来说 它指出:

State and timers are not yet supported in Beam's Python SDK.

虽然它没有说明这样做的原因......这是不可能的先天原因吗?

我希望为我打算实现的信号处理系统实现重播缓冲区/windowing 系统。由此,长度为 W 的特征的滑动 window / 历史帧缓冲区不断更新为最新的 window。

在 Java 中,它的实现如下所示:

static class FeatureFrameBuffer 扩展了 DoFn,FeatureFrame> { 整数缓冲区大小;

    public FeatureFrameBuffer(Integer bufferSize) {
        this.bufferSize = bufferSize;
    }

    @StateId("buffer")
    private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();

    @StateId("count")
    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

    @ProcessElement
    public void process(
                        ProcessContext context,
                        @StateId("buffer") BagState<KV<String, Double>> bufferState,
                        @StateId("count") ValueState<Integer> countState
                        ) {

        int count = firstNonNull(countState.read(), 0);
        count = count + 1;
        countState.write(count);
        bufferState.add(context.element());

        // Only output buffer if count is greater than bufferSize
        // Remove last element from buffer if count
        // greater than or equals buferSize
        if (count >= bufferSize) {
            bufferState.read();
            createFeatureFrame();
            context.output(featureFrame);
            bufferState.clear();
            countState.clear();
        }
    }
}

在我开始开发自定义实现之前,我想知道是否可以使用 python sdk 实现同样的效果。关于此事的一些建议会很好。

截至今天,Python SDK 对状态处理的支持仍然是一个悬而未决的问题。请参阅 https://issues.apache.org/jira/browse/BEAM-2687, and it is blocked by this 工单:"Implement Beam Python User State and Timer API",但正在积极进行中。

从 Beam 版本 2.9.0 开始,用户状态和计时器可用。不过文档尚未更新。