如何在 Java 中迭代 DStream
How to Iterate DStream in Java
我是 Spark 编程的新手。我有一个 spark 流程序,它需要将接收到的 DStream 存储到 database.I 想要迭代我的 Dstream 并将每条记录存储到数据库中。
像这样。
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
DStream<String> dstream = socketTextStream.dstream();
// Iterate each record from the DStream and push it to DB
方法二:
这是正确的做法吗?这种方法会带来任何性能 gain/issue 吗?
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
List<String> collect = rdd.collect();
for (String string : collect) {
System.out.println(string);
}
return null;
}
});
您可以使用 JavaDStream.foreachRDD
and JavaRDD.foreach
:
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
socketTextStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
// Save data
}
});
}
});
或使用 Java 8 Lambda Expressions:
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
socketTextStream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
rdd.foreach((VoidFunction<String>) s -> {
// Save data
});
});
编辑
由于您使用的是 Spark 1.2.0(有点旧,我建议升级(当前最新版本是 1.6.1,截至 2016 年 5 月 22 日)):
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
// Save data
}
});
return null;
}
});
我是 Spark 编程的新手。我有一个 spark 流程序,它需要将接收到的 DStream 存储到 database.I 想要迭代我的 Dstream 并将每条记录存储到数据库中。
像这样。
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
DStream<String> dstream = socketTextStream.dstream();
// Iterate each record from the DStream and push it to DB
方法二:
这是正确的做法吗?这种方法会带来任何性能 gain/issue 吗?
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
List<String> collect = rdd.collect();
for (String string : collect) {
System.out.println(string);
}
return null;
}
});
您可以使用 JavaDStream.foreachRDD
and JavaRDD.foreach
:
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
socketTextStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
// Save data
}
});
}
});
或使用 Java 8 Lambda Expressions:
JavaStreamingContext streamingContext = getSparkStreamingContext();
JavaReceiverInputDStream<String> socketTextStream = streamingContext
.socketTextStream("localhost", 8080);
socketTextStream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> {
rdd.foreach((VoidFunction<String>) s -> {
// Save data
});
});
编辑
由于您使用的是 Spark 1.2.0(有点旧,我建议升级(当前最新版本是 1.6.1,截至 2016 年 5 月 22 日)):
socketTextStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
// Save data
}
});
return null;
}
});