在 Apache Beam 中使用用户会话 Window 进行状态处理
Stateful processing with User Session Window in Apache Beam
我想为每个用户会话发生的事件维护一个值状态计数器的简单用例 Window。
我在上面尝试时遇到的问题是低于异常,
java.lang.UnsupportedOperationException: MergingWindowFn is not supported for stateful DoFns, WindowFn is: org.apache.beam.sdk.transforms.windowing.Sessions@1d4df
at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.rejectMergingWindowFn (StatefulDoFnRunner.java:112)
at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.<init> (StatefulDoFnRunner.java:107)
at org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners.defaultStatefulDoFnRunner (DoFnRunners.java:157)
at org.apache.beam.runners.direct.ParDoEvaluator.lambda$defaultRunnerFactory[=11=] (ParDoEvaluator.java:111)
at org.apache.beam.runners.direct.ParDoEvaluator.create (ParDoEvaluator.java:156)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createParDoEvaluator (ParDoEvaluatorFactory.java:152)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:123)
at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.createEvaluator (StatefulParDoEvaluatorFactory.java:109)
at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.forApplication (StatefulParDoEvaluatorFactory.java:89)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:178)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
代码段,其中
- 从文件中读取数据(为了测试,真实场景将是流式传输)
- JSON 正在解析
- 时间戳映射
- 转换为键值 PCollection
- 会话 Windows 按键:sessionId
- ParDo 中的增量值状态 - 记录以验证计数器状态
pipeline
// read data from file
.apply("ReadInputData", TextIO.read().from(options.getInputPath()))
// parse json
.apply("ParseJson", ParseJsons.of(InputEvents.class))
.setCoder(SerializableCoder.of(InputEvents.class))
// add timestamp to events
.apply("AddTimestamp", WithTimestamps.of(
(InputEvents events) -> {
return Instant.parse(events.getTimestamp(), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz"));
})
)
// key value pair for sessionID and events data
.apply("MapEventsToKV", MapElements.via(
new SimpleFunction<InputEvents, KV<String, InputEvents>>() {
@Override
public KV<String, InputEvents> apply(InputEvents input) {
return KV.of(input.getSessionId(), input);
}
}))
// window by user session
.apply("SessionWindows", Window.<KV<String, InputEvents>>into(
Sessions.withGapDuration(Duration.standardMinutes(2))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
)
// output log
.apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {
private static final String COUNTER_NAME = "occurrences_counter";
@StateId(COUNTER_NAME)
private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(@Element KV<String, InputEvents> userSessionEvents,
OutputReceiver<String> outputReceiver,
@StateId(COUNTER_NAME) ValueState<Integer> counterState,
IntervalWindow window) {
int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
int incrementedCounter = currentValue + 1;
counterState.write(incrementedCounter);
LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
}
}));
return pipeline.run();
假设输入数据如下所示,
session_id | event_timestamp | attr1 | attr2 |
1 |2021-08-29 10:54:54 UTC | x | xx |
1 |2021-08-29 10:55:54 UTC | x | xx |
2 |2021-08-29 10:55:59 UTC | x | xx |
2 |2021-08-29 10:56:35 UTC | x | xx |
1 |2021-08-29 10:56:14 UTC | x | xx |
预期输出是,
Window ==> 2021-08-29T10:54:54.000Z..2021-08-29T10:58:14.000Z :: counterValue ==> 3
Window ==> 2021-08-29T10:55:59.000Z..2021-08-29T10:58:35.000Z :: counterValue ==> 2
深入查看 beam 代码,我发现会话 windows 是 MergingWindow
并且无法在合并的 windows 中维护状态,因此我遇到了上述异常。
后来,我使用 GlobalWindows
和 State
+ Timer
.
实现了用例
如果在 2 分钟内没有针对相同 session_id 的新消息,则计时器用于重置计数器。
参考:https://beam.apache.org/blog/timely-processing
.apply("GlobalWindows", Window.<KV<String, InputEvents>>into(
new GlobalWindows()
)
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
.triggering(
Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane().plusDelayOf(Duration.ZERO)
)).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
.apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {
private static final String COUNTER_NAME = "occurrences_counter";
private static final String GC_TIMER = "gcTimer";
@TimerId(GC_TIMER)
private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId(COUNTER_NAME)
private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(@Element KV<String, InputEvents> userSessionEvents,
OutputReceiver<String> outputReceiver,
@StateId(COUNTER_NAME) ValueState<Integer> counterState,
@TimerId(GC_TIMER) Timer gcTimer) {
int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
int incrementedCounter = currentValue + 1;
counterState.write(incrementedCounter);
gcTimer.offset(Duration.standardMinutes(2)).setRelative();
LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
}
@OnTimer(GC_TIMER)
public void onStale(@StateId(COUNTER_NAME) ValueState<Integer> counterState) {
counterState.clear();
}
}));
我想为每个用户会话发生的事件维护一个值状态计数器的简单用例 Window。
我在上面尝试时遇到的问题是低于异常,
java.lang.UnsupportedOperationException: MergingWindowFn is not supported for stateful DoFns, WindowFn is: org.apache.beam.sdk.transforms.windowing.Sessions@1d4df
at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.rejectMergingWindowFn (StatefulDoFnRunner.java:112)
at org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.<init> (StatefulDoFnRunner.java:107)
at org.apache.beam.repackaged.direct_java.runners.core.DoFnRunners.defaultStatefulDoFnRunner (DoFnRunners.java:157)
at org.apache.beam.runners.direct.ParDoEvaluator.lambda$defaultRunnerFactory[=11=] (ParDoEvaluator.java:111)
at org.apache.beam.runners.direct.ParDoEvaluator.create (ParDoEvaluator.java:156)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createParDoEvaluator (ParDoEvaluatorFactory.java:152)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:123)
at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.createEvaluator (StatefulParDoEvaluatorFactory.java:109)
at org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory.forApplication (StatefulParDoEvaluatorFactory.java:89)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:178)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:122)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
代码段,其中
- 从文件中读取数据(为了测试,真实场景将是流式传输)
- JSON 正在解析
- 时间戳映射
- 转换为键值 PCollection
- 会话 Windows 按键:sessionId
- ParDo 中的增量值状态 - 记录以验证计数器状态
pipeline
// read data from file
.apply("ReadInputData", TextIO.read().from(options.getInputPath()))
// parse json
.apply("ParseJson", ParseJsons.of(InputEvents.class))
.setCoder(SerializableCoder.of(InputEvents.class))
// add timestamp to events
.apply("AddTimestamp", WithTimestamps.of(
(InputEvents events) -> {
return Instant.parse(events.getTimestamp(), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss zzz"));
})
)
// key value pair for sessionID and events data
.apply("MapEventsToKV", MapElements.via(
new SimpleFunction<InputEvents, KV<String, InputEvents>>() {
@Override
public KV<String, InputEvents> apply(InputEvents input) {
return KV.of(input.getSessionId(), input);
}
}))
// window by user session
.apply("SessionWindows", Window.<KV<String, InputEvents>>into(
Sessions.withGapDuration(Duration.standardMinutes(2))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
)
// output log
.apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {
private static final String COUNTER_NAME = "occurrences_counter";
@StateId(COUNTER_NAME)
private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(@Element KV<String, InputEvents> userSessionEvents,
OutputReceiver<String> outputReceiver,
@StateId(COUNTER_NAME) ValueState<Integer> counterState,
IntervalWindow window) {
int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
int incrementedCounter = currentValue + 1;
counterState.write(incrementedCounter);
LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
}
}));
return pipeline.run();
假设输入数据如下所示,
session_id | event_timestamp | attr1 | attr2 |
1 |2021-08-29 10:54:54 UTC | x | xx |
1 |2021-08-29 10:55:54 UTC | x | xx |
2 |2021-08-29 10:55:59 UTC | x | xx |
2 |2021-08-29 10:56:35 UTC | x | xx |
1 |2021-08-29 10:56:14 UTC | x | xx |
预期输出是,
Window ==> 2021-08-29T10:54:54.000Z..2021-08-29T10:58:14.000Z :: counterValue ==> 3
Window ==> 2021-08-29T10:55:59.000Z..2021-08-29T10:58:35.000Z :: counterValue ==> 2
深入查看 beam 代码,我发现会话 windows 是 MergingWindow
并且无法在合并的 windows 中维护状态,因此我遇到了上述异常。
后来,我使用 GlobalWindows
和 State
+ Timer
.
如果在 2 分钟内没有针对相同 session_id 的新消息,则计时器用于重置计数器。
参考:https://beam.apache.org/blog/timely-processing
.apply("GlobalWindows", Window.<KV<String, InputEvents>>into(
new GlobalWindows()
)
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
.triggering(
Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane().plusDelayOf(Duration.ZERO)
)).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
.apply("Log", ParDo.of(new DoFn<KV<String, InputEvents>, String>() {
private static final String COUNTER_NAME = "occurrences_counter";
private static final String GC_TIMER = "gcTimer";
@TimerId(GC_TIMER)
private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId(COUNTER_NAME)
private final StateSpec<ValueState<Integer>> counter = StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(@Element KV<String, InputEvents> userSessionEvents,
OutputReceiver<String> outputReceiver,
@StateId(COUNTER_NAME) ValueState<Integer> counterState,
@TimerId(GC_TIMER) Timer gcTimer) {
int currentValue = Optional.ofNullable(counterState.read()).orElse(0);
int incrementedCounter = currentValue + 1;
counterState.write(incrementedCounter);
gcTimer.offset(Duration.standardMinutes(2)).setRelative();
LOG.info("Window ==> {} :: counterValue ==> {}", window.toString(), incrementedCounter);
}
@OnTimer(GC_TIMER)
public void onStale(@StateId(COUNTER_NAME) ValueState<Integer> counterState) {
counterState.clear();
}
}));