Spark 流套接字流示例不起作用
Spark streaming socket stream exemple not working
我正在尝试使用 Spark Streaming,但我卡在了第一个例子中:
import java.util.Arrays;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
public class NetworkWordCount {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination();
}
}
这段代码实际上是文档的副本。
https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html
我像这样设置了一个 netcat 服务器:
nc -lk 9999
和类似的 obne nc 客户端:
nc localhost 9999
我在其中输入如下句子:
世界,您好!
世界您好! \n
在 netcat 服务器上正确显示。
但它不起作用。每一批我都只有一张空白的照片。
21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
-------------------------------------------
Time: 1613432201000 ms
-------------------------------------------
我尝试了 setMaster
的不同值,例如 local[4]
、local[2]
和 local[*]
,但结果是一样的。
此外,如果我 运行 netcat 客户端之前的 spark 流代码,我什至看不到 nc 服务器上的字符串。
我找到了解决问题的方法。
简而言之,您需要直接在终端 运行 TCP 服务器中写入消息,您不需要另一个 netcat 客户端。
没有错误或缺少配置,这只是对 netcat 工作方式的误解。
我从 man nc
了解到 -k
选项允许 netcat 管理多个连接,但部分错误。
-k When a connection is completed, listen for another one. Requires -l. When used together with the -u option, the server socket is not connected and it can receive UDP datagrams from multiple
hosts.
但确实和我想的完全不一样。如果您使用 -k
选项设置 netcat 服务器,那么它将接受多个连接,但仍会一次处理一个。
这意味着如果您有 2 个 nc 客户端,并且如果您在两个客户端中都输入了一些文本,则在您关闭第一个连接之前,服务器只会接收其中一个的文本。
我正在尝试使用 Spark Streaming,但我卡在了第一个例子中:
import java.util.Arrays;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
public class NetworkWordCount {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination();
}
}
这段代码实际上是文档的副本。
https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html
我像这样设置了一个 netcat 服务器:
nc -lk 9999
和类似的 obne nc 客户端:
nc localhost 9999
我在其中输入如下句子:
世界,您好! 世界您好! \n
在 netcat 服务器上正确显示。
但它不起作用。每一批我都只有一张空白的照片。
21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
-------------------------------------------
Time: 1613432201000 ms
-------------------------------------------
我尝试了 setMaster
的不同值,例如 local[4]
、local[2]
和 local[*]
,但结果是一样的。
此外,如果我 运行 netcat 客户端之前的 spark 流代码,我什至看不到 nc 服务器上的字符串。
我找到了解决问题的方法。
简而言之,您需要直接在终端 运行 TCP 服务器中写入消息,您不需要另一个 netcat 客户端。
没有错误或缺少配置,这只是对 netcat 工作方式的误解。
我从 man nc
了解到 -k
选项允许 netcat 管理多个连接,但部分错误。
-k When a connection is completed, listen for another one. Requires -l. When used together with the -u option, the server socket is not connected and it can receive UDP datagrams from multiple
hosts.
但确实和我想的完全不一样。如果您使用 -k
选项设置 netcat 服务器,那么它将接受多个连接,但仍会一次处理一个。
这意味着如果您有 2 个 nc 客户端,并且如果您在两个客户端中都输入了一些文本,则在您关闭第一个连接之前,服务器只会接收其中一个的文本。