Flink 水印未在 Python 中推进,停留在 -9223372036854775808
Flink watermarks not advancing in Python, stuck at -9223372036854775808
我在多个管道中都遇到过这个问题,但一直找不到答案。当 运行 使用时间戳分配器为单调或越界时间戳分配水印策略的管道时,时间戳被正确提取并正在推进,但水印停留在 -9223372036854775808。我尝试 运行 pyflink 库中的 event_time_timer.py 示例作为完整性检查,但经检查 process_element 和 on_timer 方法都没有移动 -9223372036854775808 的水印。和 9223372036854775807 分别。
这是流程函数和时间戳分配器的代码:
class Sum(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
state_ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
.disable_cleanup_in_background() \
.build()
state_descriptor.enable_time_to_live(state_ttl_config)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = 0
# update the state's count
current += value[2]
self.state.update(current)
# register an event time timer 2 seconds later
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield ctx.get_current_key(), self.state.value(), timestamp
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp: int) -> int:
return int(value[0])
这是主要功能:
def event_timer_timer_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(1)
ds = env.from_collection(
collection=[
(1000, 'Alice', 110.1),
(4000, 'Bob', 30.2),
(3000, 'Alice', 20.0),
(2000, 'Bob', 53.1),
(5000, 'Alice', 13.1),
(3000, 'Bob', 3.1),
(7000, 'Bob', 16.1),
(10000, 'Alice', 20.1)
],
type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
ds = ds.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
.with_timestamp_assigner(MyTimestampAssigner()))
# apply the process function onto a keyed stream
ds.key_by(lambda value: value[1]) \
.process(Sum()) \
.print()
# submit for execution
env.execute()
无论我使用哪种水印策略,我的主管道都存在同样的问题。水印不应该链接到时间戳并在 ProcessFunction.Context 中看到吗?
这个问题主要是因为Source是一个BOUNDED Source。在触发WatermarkStrategy之前,整个Flink Job的执行就结束了。
可以参考下面的例子生成User记录而不是fromCollection
/** Data-generating source function. */
public static final class Generator
implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
private ListState<Integer> state = null;
Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Tuple2.of(i, numRecordsEmitted));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
}
}
我在多个管道中都遇到过这个问题,但一直找不到答案。当 运行 使用时间戳分配器为单调或越界时间戳分配水印策略的管道时,时间戳被正确提取并正在推进,但水印停留在 -9223372036854775808。我尝试 运行 pyflink 库中的 event_time_timer.py 示例作为完整性检查,但经检查 process_element 和 on_timer 方法都没有移动 -9223372036854775808 的水印。和 9223372036854775807 分别。
这是流程函数和时间戳分配器的代码:
class Sum(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor("state", Types.FLOAT())
state_ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
.disable_cleanup_in_background() \
.build()
state_descriptor.enable_time_to_live(state_ttl_config)
self.state = runtime_context.get_state(state_descriptor)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = 0
# update the state's count
current += value[2]
self.state.update(current)
# register an event time timer 2 seconds later
ctx.timer_service().register_event_time_timer(ctx.timestamp() + 2000)
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield ctx.get_current_key(), self.state.value(), timestamp
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp: int) -> int:
return int(value[0])
这是主要功能:
def event_timer_timer_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(1)
ds = env.from_collection(
collection=[
(1000, 'Alice', 110.1),
(4000, 'Bob', 30.2),
(3000, 'Alice', 20.0),
(2000, 'Bob', 53.1),
(5000, 'Alice', 13.1),
(3000, 'Bob', 3.1),
(7000, 'Bob', 16.1),
(10000, 'Alice', 20.1)
],
type_info=Types.TUPLE([Types.LONG(), Types.STRING(), Types.FLOAT()]))
ds = ds.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(2))
.with_timestamp_assigner(MyTimestampAssigner()))
# apply the process function onto a keyed stream
ds.key_by(lambda value: value[1]) \
.process(Sum()) \
.print()
# submit for execution
env.execute()
无论我使用哪种水印策略,我的主管道都存在同样的问题。水印不应该链接到时间戳并在 ProcessFunction.Context 中看到吗?
这个问题主要是因为Source是一个BOUNDED Source。在触发WatermarkStrategy之前,整个Flink Job的执行就结束了。
可以参考下面的例子生成User记录而不是fromCollection
/** Data-generating source function. */
public static final class Generator
implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
private ListState<Integer> state = null;
Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Tuple2.of(i, numRecordsEmitted));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
}
}