Apache Flink 过滤函数

Apache Flink Filter Function

我想在 Apache Flink 中实现自定义过滤器功能,但我不知道如何在不硬连接的情况下将过滤条件列表注入其中。

假设我的函数如下所示

public class CustomFilter implements FilterFunction{

  @Override
  public boolean filter(Object o) throws Exception{
  String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
     for(String s: values){
        if(!o.toString().contains(s)) return false;
     }
  return true;
  }
}

流式传输作业将如下所示:

public class StreamingJob{
...
env 
    .fromElements("Data","New Data","First")
    .filter(new CustomFilter())
    .print
    .execute();
}

当我尝试向 class 中的 CustomFilter 函数参数添加某种集合时,例如

public boolean filter(String s, Collection<String> searchValues){
    ...
}

我得到的消息是该函数必须来自 String 类型,因为它是一个已实现的函数。

正如其他人指出的那样,只需保存您通过构造函数传入的目标值列表,然后在 filter() 方法中使用它们。

public class CustomFilter implements FilterFunction<Object> {

    private String[] targetValues;

    public CustomFilter(String[] targetValues) {
        this.targetValues = targetValues;
    }


}