在 Apache Beam 中使用用户会话 Window 进行状态处理

Stateful processing with User Session Window in Apache Beam

我想为每个用户会话发生的事件维护一个值状态计数器的简单用例 Window。

我在上面尝试时遇到的问题是低于异常,

java.lang.UnsupportedOperationException: MergingWindowFn is not supported for stateful DoFns, WindowFn is: org.apache.beam.sdk.transforms.windowing.Sessions@1d4df
    at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.rejectMergingWindowFn (StatefulDoFnRunner.java:112)
    at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.<init> (StatefulDoFnRunner.java:107)
    at org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners.defaultStatefulDoFnRunner (DoFnRunners.java:157)
    at org.apache.beam.runners.direct.ParDoEvaluator.lambda$defaultRunnerFactory[=11=] (ParDoEvaluator.java:111)
    at org.apache.beam.runners.direct.ParDoEvaluator.create (ParDoEvaluator.java:156)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createParDoEvaluator (ParDoEvaluatorFactory.java:152)
    at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:123)
    at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.createEvaluator (StatefulParDoEvaluatorFactory.java:109)
    at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.forApplication (StatefulParDoEvaluatorFactory.java:89)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:178)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:122)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

代码段,其中

        pipeline

                // read data from file
                .apply("ReadInputData", TextIO.read().from(options.getInputPath()))

                // parse json
                .apply("ParseJson", ParseJsons.of(InputEvents.class))
                    .setCoder(SerializableCoder.of(InputEvents.class))

                // add timestamp to events
                .apply("AddTimestamp", WithTimestamps.of(
                        (InputEvents events) -> {
                            return Instant.parse(events.getTimestamp(), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz"));
                        })
                )

                // key value pair for sessionID and events data
                .apply("MapEventsToKV", MapElements.via(
                        new SimpleFunction<InputEvents, KV<String, InputEvents>>() {
                            @Override
                            public KV<String, InputEvents> apply(InputEvents input) {
                                return KV.of(input.getSessionId(), input);
                            }
                        }))

                // window by user session
                .apply("SessionWindows", Window.<KV<String, InputEvents>>into(
                        Sessions.withGapDuration(Duration.standardMinutes(2))
                        .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
                )

                // output log
                .apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {

                    private static final String COUNTER_NAME = "occurrences_counter";

                    @StateId(COUNTER_NAME)
                    private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());

                    @ProcessElement
                    public void processElement(@Element KV<String, InputEvents> userSessionEvents,
                                               OutputReceiver<String> outputReceiver,
                                               @StateId(COUNTER_NAME) ValueState<Integer> counterState,
                                               IntervalWindow window) {

                        int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
                        int incrementedCounter = currentValue + 1;
                        counterState.write(incrementedCounter);

                        LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
                    }
                }));

          return pipeline.run();

假设输入数据如下所示,

session_id | event_timestamp        | attr1 | attr2 |
1          |2021-08-29 10:54:54 UTC | x     | xx    |
1          |2021-08-29 10:55:54 UTC | x     | xx    |
2          |2021-08-29 10:55:59 UTC | x     | xx    |
2          |2021-08-29 10:56:35 UTC | x     | xx    |
1          |2021-08-29 10:56:14 UTC | x     | xx    |

预期输出是,

Window ==> 2021-08-29T10:54:54.000Z..2021-08-29T10:58:14.000Z :: counterValue ==> 3
Window ==> 2021-08-29T10:55:59.000Z..2021-08-29T10:58:35.000Z :: counterValue ==> 2

深入查看 beam 代码,我发现会话 windows 是 MergingWindow 并且无法在合并的 windows 中维护状态,因此我遇到了上述异常。

后来,我使用 GlobalWindowsState + Timer.

实现了用例

如果在 2 分钟内没有针对相同 session_id 的新消息,则计时器用于重置计数器。

参考:https://beam.apache.org/blog/timely-processing

.apply("GlobalWindows", Window.<KV<String, InputEvents>>into(
        new GlobalWindows()
    )
        .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
        .triggering(
                Repeatedly.forever(AfterProcessingTime.
                        pastFirstElementInPane().plusDelayOf(Duration.ZERO)
                )).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

.apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {

    private static final String COUNTER_NAME = "occurrences_counter";
       private static final String GC_TIMER = "gcTimer";

       @TimerId(GC_TIMER)
       private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    @StateId(COUNTER_NAME)
    private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());

    @ProcessElement
    public void processElement(@Element KV<String, InputEvents> userSessionEvents,
                               OutputReceiver<String> outputReceiver,
                               @StateId(COUNTER_NAME) ValueState<Integer> counterState,
                               @TimerId(GC_TIMER) Timer gcTimer) {

        int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
        int incrementedCounter = currentValue + 1;
        counterState.write(incrementedCounter);

        gcTimer.offset(Duration.standardMinutes(2)).setRelative();

        LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
    }

        @OnTimer(GC_TIMER)
        public void onStale(@StateId(COUNTER_NAME) ValueState<Integer> counterState) {
            counterState.clear();
        }
}));