如何在 Scala 中使用 Flink 应用简单过滤器

How to apply a simple filter with Flink in Scala

我使用的是旧版本的 Flink。我升级到 1.2.0,但我遇到了一些过滤器问题。

我有一个日志数据流,它工作得很好:

 val logs: DataStream[Log] = env.addSource(new LogSource(
      data, delay, factor))

 // DISPLAY TUPLE IN CONSOLE
 logs.print()

 // EXECUTE SCRIPT
 env.execute("stream")

我当然已经阅读了显示的文档:

dataStream.filter { _ != 0 }

我尝试了很多这样的事情:

val cleanLogs = logs.filter { _.isComplete }

但我收到以下错误:

Type mismatch, expected: FilterFunction[Log], actual: (Any) => An

所以我没有看到文档和此错误之间的 link。 有什么帮助吗?示例 ?

谢谢

问题首先是 StreamExecutionEnvironment 的错误导入导致 filter.

等基本功能出现此问题

然后,当我使用旧版本的 Flink 时,我使用的是 LocalExecutionEnvironment class,它在 Flink 1.X.

中不再可用

相反:StreamExecutionEnvironment.createLocalEnvironment(1)