如何在 Scala 中使用 Flink 应用简单过滤器
How to apply a simple filter with Flink in Scala
我使用的是旧版本的 Flink。我升级到 1.2.0,但我遇到了一些过滤器问题。
我有一个日志数据流,它工作得很好:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
我当然已经阅读了显示的文档:
dataStream.filter { _ != 0 }
我尝试了很多这样的事情:
val cleanLogs = logs.filter { _.isComplete }
但我收到以下错误:
Type mismatch, expected: FilterFunction[Log], actual: (Any) => An
所以我没有看到文档和此错误之间的 link。
有什么帮助吗?示例 ?
谢谢
问题首先是 StreamExecutionEnvironment
的错误导入导致 filter
.
等基本功能出现此问题
然后,当我使用旧版本的 Flink 时,我使用的是 LocalExecutionEnvironment
class,它在 Flink 1.X.
中不再可用
相反:StreamExecutionEnvironment.createLocalEnvironment(1)
我使用的是旧版本的 Flink。我升级到 1.2.0,但我遇到了一些过滤器问题。
我有一个日志数据流,它工作得很好:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
我当然已经阅读了显示的文档:
dataStream.filter { _ != 0 }
我尝试了很多这样的事情:
val cleanLogs = logs.filter { _.isComplete }
但我收到以下错误:
Type mismatch, expected: FilterFunction[Log], actual: (Any) => An
所以我没有看到文档和此错误之间的 link。 有什么帮助吗?示例 ?
谢谢
问题首先是 StreamExecutionEnvironment
的错误导入导致 filter
.
然后,当我使用旧版本的 Flink 时,我使用的是 LocalExecutionEnvironment
class,它在 Flink 1.X.
相反:StreamExecutionEnvironment.createLocalEnvironment(1)