无法在自定义 DataSink 上初始化字段(Flink CEP)
Cant initialize Field on Custom DataSink (Flink CEP)
我的 Apache Flink Streaming 有问题API。
我可以设法使用自定义数据源设置整个 CEP 环境,并且在 "print()" 等源上使用标准接收器时,一切正常。
这是我的水槽现在的样子:
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
我试图实现的是将方法引用传递给此 SinkFunction,它应为我的 DataStream 中的每个元素执行。
这是我初始化 SinkFunction 的方式:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
我的问题是,当我在自定义接收器的 "invoke" 方法中设置断点时,即使我显式调用构造函数(分配给消费者),消费者似乎也为空。
由于接收器分布到与接收器并行度一样多的实例,因此它应该是可序列化的。在集群上执行时,Sink
被序列化发送到 TaskManagers
,在那里它被反序列化。
在您的示例中,consumer
字段是 transient
,这就是为什么在序列化后它变成 null
。
我的 Apache Flink Streaming 有问题API。
我可以设法使用自定义数据源设置整个 CEP 环境,并且在 "print()" 等源上使用标准接收器时,一切正常。
这是我的水槽现在的样子:
@RequiredArgsConstructor
public class EventDataConsumer extends RichSinkFunction<EventData>{
private final transient Consumer<EventData> consumer;
@Override
public void invoke(EventData eventData) throws Exception {
consumer.accept(eventData);
}
}
我试图实现的是将方法引用传递给此 SinkFunction,它应为我的 DataStream 中的每个元素执行。
这是我初始化 SinkFunction 的方式:
EventDataConsumer consumer = new EventDataConsumer(someService::handleEventData);
outStream.addSink(consumer);
我的问题是,当我在自定义接收器的 "invoke" 方法中设置断点时,即使我显式调用构造函数(分配给消费者),消费者似乎也为空。
由于接收器分布到与接收器并行度一样多的实例,因此它应该是可序列化的。在集群上执行时,Sink
被序列化发送到 TaskManagers
,在那里它被反序列化。
在您的示例中,consumer
字段是 transient
,这就是为什么在序列化后它变成 null
。