调用 execute() 后是否可以在 FLINK CEP 中添加新模式?
is it possible to add new patterns in FLINK CEP after calling execute()?
我的代码如下:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
...定义我的模式
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
此代码有效并执行我想要的操作,并且我得到了一个遵循我设置的模式的结果流。
我想知道的是,是否可以将新模式应用到我稍后添加到环境中的这个源,从而在不调用 env.execute() 另一个的情况下获得匹配不同模式的不同结果流时间是因为当我这样做时,除了我的新结果流之外,我还会得到多余的旧结果流(即旧模式被多次执行)?
目前 Flink 的 CEP 库不支持开箱即用的动态模式更改。因此,一旦您定义了您的模式并开始了您的工作,它将只处理这个定义的模式。
但是,您可以编写自己的运算符来实现 TwoInputStreamOperator
接口,该接口在一个输入模式定义上接收,在另一个输入上接收流记录(类似于 CoFlatMap 函数)。对于每个新模式,您都必须在运算符上编译一个新的 NFA
,并将任何新的传入流元素也提供给这个 NFA
。这样,您就可以实现预期的行为。
未来,我们很可能会将此功能添加到 Flink 的 CEP 库中。
我的代码如下:
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyObject> input = env.addSource(new MyCustomSource());
Pattern<MyObject, ?> pattern = Pattern.<MyObject>begin("start");
PatternStream<MyObject> patternStream = CEP.pattern(input, pattern);
...定义我的模式
DataStream<MyObject> resultStream = patternStream.select(new MyCustomPatternSelectFunction());
resultStream.addSink(new MyCustomSinkFunction(subscriptionCriteria));
try
{
env.execute();
}
catch (Exception exception)
{
log.debug("Error while ", exception);
}
此代码有效并执行我想要的操作,并且我得到了一个遵循我设置的模式的结果流。
我想知道的是,是否可以将新模式应用到我稍后添加到环境中的这个源,从而在不调用 env.execute() 另一个的情况下获得匹配不同模式的不同结果流时间是因为当我这样做时,除了我的新结果流之外,我还会得到多余的旧结果流(即旧模式被多次执行)?
目前 Flink 的 CEP 库不支持开箱即用的动态模式更改。因此,一旦您定义了您的模式并开始了您的工作,它将只处理这个定义的模式。
但是,您可以编写自己的运算符来实现 TwoInputStreamOperator
接口,该接口在一个输入模式定义上接收,在另一个输入上接收流记录(类似于 CoFlatMap 函数)。对于每个新模式,您都必须在运算符上编译一个新的 NFA
,并将任何新的传入流元素也提供给这个 NFA
。这样,您就可以实现预期的行为。
未来,我们很可能会将此功能添加到 Flink 的 CEP 库中。