flink SourceFunction<> 正在 StreamExecutionEnvironment.addSource() 中被替换?
flink SourceFunction<> is being replaced in StreamExecutionEnvironment.addSource()?
我 运行 在尝试创建自定义事件源时遇到了这个问题。其中包含一个队列,允许我的其他进程向其中添加项目。然后期望我的 CEP 模式在匹配时打印一些调试消息。
但是无论我向队列中添加什么,都没有匹配项。然后我注意到 mySource.run() 里面的队列总是空的。这意味着我用来创建 mySource 实例的队列与 StreamExecutionEnvironment
中的队列不同。如果我将队列更改为静态,强制所有实例共享同一个队列,一切都会按预期进行。
DummySource.java
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
所以我深入研究了StreamExecutionEnvironment
的源代码。在 addSource() 方法中。有一个 clean() 方法,它看起来像是将实例替换为新实例。
Returns a "closure-cleaned" version of the given function.
这是为什么?为什么需要序列化?
我还尝试使用 getConfig() 关闭干净的闭包。结果还是一样。我的队列实例与 env 使用的不同。
如何解决这个问题?
Flink中函数使用的clean()
方法主要是保证Function
(如SourceFunction、MapFunction)可序列化。 Flink 会将这些函数序列化并分发到任务节点上执行。
对于 Flink 主代码中的简单变量,例如 int,您可以在函数中简单地引用它们。但对于大型或不可序列化的,最好使用广播和丰富的源功能。请参考https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables
我 运行 在尝试创建自定义事件源时遇到了这个问题。其中包含一个队列,允许我的其他进程向其中添加项目。然后期望我的 CEP 模式在匹配时打印一些调试消息。
但是无论我向队列中添加什么,都没有匹配项。然后我注意到 mySource.run() 里面的队列总是空的。这意味着我用来创建 mySource 实例的队列与 StreamExecutionEnvironment
中的队列不同。如果我将队列更改为静态,强制所有实例共享同一个队列,一切都会按预期进行。
DummySource.java
public class DummySource implements SourceFunction<String> {
private static final long serialVersionUID = 3978123556403297086L;
// private static Queue<String> queue = new LinkedBlockingQueue<String>();
private Queue<String> queue;
private boolean cancel = false;
public void setQueue(Queue<String> q){
queue = q;
}
@Override
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<String> ctx)
throws Exception {
System.out.println("run");
synchronized (queue) {
while (!cancel) {
if (queue.peek() != null) {
String e = queue.poll();
if (e.equals("exit")) {
cancel();
}
System.out.println("collect "+e);
ctx.collectWithTimestamp(e, System.currentTimeMillis());
}
}
}
}
@Override
public void cancel() {
System.out.println("canceled");
cancel = true;
}
}
所以我深入研究了StreamExecutionEnvironment
的源代码。在 addSource() 方法中。有一个 clean() 方法,它看起来像是将实例替换为新实例。
Returns a "closure-cleaned" version of the given function.
这是为什么?为什么需要序列化? 我还尝试使用 getConfig() 关闭干净的闭包。结果还是一样。我的队列实例与 env 使用的不同。
如何解决这个问题?
Flink中函数使用的clean()
方法主要是保证Function
(如SourceFunction、MapFunction)可序列化。 Flink 会将这些函数序列化并分发到任务节点上执行。
对于 Flink 主代码中的简单变量,例如 int,您可以在函数中简单地引用它们。但对于大型或不可序列化的,最好使用广播和丰富的源功能。请参考https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables