Apache Flink Wikipedia 使用 Scala 编辑分析
Apache flink Wikipedia edit analytics with Scala
我正在尝试将 Apache Flink 教程中的维基百科编辑流分析从 https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html
重写为 Scala
教程中的代码是
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
下面是我在 scala 中的尝试
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = env.addSource(new WikipediaEditsSource());
val keyedEdits = edits.keyBy(event => event.getUser)
val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
(we.getUser, t._2 + we.getByteDiff))
}
这或多或少是一个单词到单词的转换,基于它 val result
的类型应该是 DataStream[(String, Long)]
但在 fold()
之后推断的实际类型不是哪里很近。
请帮助确定 scala 代码有什么问题
EDIT1:使用 fold[R]
的 currying 原理图进行了以下更改,现在类型符合预期类型,但仍然无法理解原因
val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))
val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
(we.getUser, t._2 + we.getByteDiff))
问题似乎出在折叠上,您必须在累加器初始值之后有一个右括号。修复该问题后,代码将无法编译,因为它没有可用于 WikipediaEditEvent 的 TypeInformation。解决这个问题的最简单方法是导入更多的 flink scala API。请参阅下面的完整示例:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val userEditsVolume: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
userEditsVolume.print()
see.execute("Wikipedia User Edit Volume")
}
我正在尝试将 Apache Flink 教程中的维基百科编辑流分析从 https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html
重写为 Scala教程中的代码是
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
.keyBy(new KeySelector<WikipediaEditEvent, String>() {
@Override
public String getKey(WikipediaEditEvent event) {
return event.getUser();
}
});
DataStream<Tuple2<String, Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
acc.f0 = event.getUser();
acc.f1 += event.getByteDiff();
return acc;
}
});
result.print();
see.execute();
}
}
下面是我在 scala 中的尝试
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App{
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = env.addSource(new WikipediaEditsSource());
val keyedEdits = edits.keyBy(event => event.getUser)
val result = keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L), (we: WikipediaEditEvent, t: (String, Long)) =>
(we.getUser, t._2 + we.getByteDiff))
}
这或多或少是一个单词到单词的转换,基于它 val result
的类型应该是 DataStream[(String, Long)]
但在 fold()
之后推断的实际类型不是哪里很近。
请帮助确定 scala 代码有什么问题
EDIT1:使用 fold[R]
的 currying 原理图进行了以下更改,现在类型符合预期类型,但仍然无法理解原因
val result_1: (((String, Long), WikipediaEditEvent) => (String, Long)) => DataStream[(String, Long)] =
keyedEdits.timeWindow(Time.seconds(5)).fold(("", 0L))
val result_2: DataStream[(String, Long)] = result_1((t: (String, Long), we: WikipediaEditEvent ) =>
(we.getUser, t._2 + we.getByteDiff))
问题似乎出在折叠上,您必须在累加器初始值之后有一个右括号。修复该问题后,代码将无法编译,因为它没有可用于 WikipediaEditEvent 的 TypeInformation。解决这个问题的最简单方法是导入更多的 flink scala API。请参阅下面的完整示例:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource
import org.apache.flink.streaming.api.windowing.time.Time
object WikipediaAnalytics extends App {
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val userEditsVolume: DataStream[(String, Int)] = edits
.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
userEditsVolume.print()
see.execute("Wikipedia User Edit Volume")
}