Apache Flink 过滤函数
Apache Flink Filter Function
我想在 Apache Flink 中实现自定义过滤器功能,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。
假设我的函数如下所示
public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}
流式传输作业将如下所示:
public class StreamingJob{
...
env
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}
当我尝试向 class 中的 CustomFilter 函数参数添加某种集合时,例如
public boolean filter(String s, Collection<String> searchValues){
...
}
我得到的消息是该函数必须来自 String 类型,因为它是一个已实现的函数。
正如其他人指出的那样,只需保存您通过构造函数传入的目标值列表,然后在 filter()
方法中使用它们。
public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}
}
我想在 Apache Flink 中实现自定义过滤器功能,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。
假设我的函数如下所示
public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}
流式传输作业将如下所示:
public class StreamingJob{
...
env
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}
当我尝试向 class 中的 CustomFilter 函数参数添加某种集合时,例如
public boolean filter(String s, Collection<String> searchValues){
...
}
我得到的消息是该函数必须来自 String 类型,因为它是一个已实现的函数。
正如其他人指出的那样,只需保存您通过构造函数传入的目标值列表,然后在 filter()
方法中使用它们。
public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}
}