Spark 流套接字流示例不起作用

Spark streaming socket stream exemple not working

我正在尝试使用 Spark Streaming,但我卡在了第一个例子中:

import java.util.Arrays;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

public class NetworkWordCount {
    public static void main(String[] args) {
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        JavaDStream<String> words = lines.flatMap(
                new FlatMapFunction<String, String>() {
                    @Override public Iterable<String> call(String x) {
                        return Arrays.asList(x.split(" "));
                    }
                });

        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                });
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();
    }
}

这段代码实际上是文档的副本。

https://spark.apache.org/docs/1.6.3/streaming-programming-guide.html

我像这样设置了一个 netcat 服务器:

nc -lk 9999

和类似的 obne nc 客户端:

nc localhost 9999

我在其中输入如下句子:

世界,您好! 世界您好! \n

在 netcat 服务器上正确显示。

但它不起作用。每一批我都只有一张空白的照片。

21/02/16 00:36:41 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[137] at socketTextStream at NetworkWordCount.java:17 of time 1613432201000 ms
21/02/16 00:36:41 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1613432199000 ms)
21/02/16 00:36:41 INFO InputInfoTracker: remove old batch metadata: 1613432199000 ms
-------------------------------------------
Time: 1613432201000 ms
-------------------------------------------

我尝试了 setMaster 的不同值,例如 local[4]local[2]local[*],但结果是一样的。

此外,如果我 运行 netcat 客户端之前的 spark 流代码,我什至看不到 nc 服务器上的字符串。

我找到了解决问题的方法。

简而言之,您需要直接在终端 运行 TCP 服务器中写入消息,您不需要另一个 netcat 客户端。

没有错误或缺少配置,这只是对 netcat 工作方式的误解。

我从 man nc 了解到 -k 选项允许 netcat 管理多个连接,但部分错误。

-k      When a connection is completed, listen for another one.  Requires -l.  When used together with the -u option, the server socket is not connected and it can receive UDP datagrams from multiple
             hosts.

但确实和我想的完全不一样。如果您使用 -k 选项设置 netcat 服务器,那么它将接受多个连接,但仍会一次处理一个。

这意味着如果您有 2 个 nc 客户端,并且如果您在两个客户端中都输入了一些文本,则在您关闭第一个连接之前,服务器只会接收其中一个的文本。