为什么 Spark Streaming 中的多个 print() 方法会影响我列表中的值?

Why do multiple print() methods in Spark Streaming affect the values in my list?

我正在尝试每两秒接收一个 JSON 行,将它们存储在一个列表中,该列表包含我创建的服装 Class 中的元素,并在每个之后打印结果列表上下文的执行。所以我正在做这样的事情:

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    JavaReceiverInputDStream<String> streamData = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);


    JavaDStream<LinkedList<StreamValue>> getdatatoplace = streamData.map(new Function<String, LinkedList<StreamValue>>() {
                @Override
                public LinkedList<StreamValue> call(String s) throws Exception {
                    //Access specific attributes in the JSON
                    Gson gson = new Gson();
                    Type type = new TypeToken<Map<String, String>>() {
                    }.getType();
                    Map<String, String> retMap = gson.fromJson(s, type);
                    String a = retMap.get("exp");

                    String idValue = retMap.get("id");

                    //insert values into the stream_Value LinkedList
                    stream_Value.push(new StreamValue(idValue, a, UUID.randomUUID()));


                    return stream_Value;

                }
            });

getdatatoplace.print();

效果很好,我得到了以下结果:

//at the end of the first batch duration/cycle
 getdatatoplace[]={json1}

//at the end of the second batch duration/cycle
 getdatatoplace[]={json1,json2}

...

但是,如果我多次打印 getdatatoplace,假设 3:

 getdatatoplace.print();
 getdatatoplace.print();
 getdatatoplace.print();

然后我得到这个结果:

 //at the end of the first print
 getdatatoplace[]={json1}

//at the end of the second print
 getdatatoplace[]={json1,json1}

 //at the end of the third print
 getdatatoplace[]={json1,json1,json1}

//Context ends with getdatatoplace.size()=3

//New cycle begins, and I get a new value json2

 //at the end of the first print
 getdatatoplace[]={json1,json1,json1,json2}
...

那么发生的事情是,对于我所做的每一次打印,即使我之前做了 stream_Value.push,并且我在批处理持续时间内给出的命令还没有结束,stream_Value 推送为我所做的每张印刷品添加到我的列表中的值。

我的问题是,为什么会发生这种情况,我该如何做到这一点,独立于我使用的 print() 方法的数量,我每次只在列表中存储一个 JSON 行批量 Duration/per 执行。

我希望我没有混淆,因为我是 Spark 的新手,可能混淆了一些词汇。非常感谢。

PS:即使我打印另一个DStream,同样的事情也会发生。假设我这样做,每个都具有上面流的相同 'architecture':

JavaDStream1.print();
JavaDStream2.print();

在JavaDStream2.print()的最后,JavaDstream1中的列表有一个额外的值。

Spark Streaming 使用与 Spark 相同的计算模型。我们在数据上声明的操作形成一个有向无环图 (DAG),当使用操作来具体化数据上的此类计算时会对其进行评估。

在 Spark Streaming 中,输出操作,例如 print() 将在每个批处理间隔安排这些操作的具体化。

此流的 DAG 看起来像这样:

[TextStream]->[map]->[print]

print 将安排对 socketTextStream 接收到的数据执行 map 操作。当我们添加更多 print 个动作时,我们的 DAG 看起来像:

            /->[map]->[print]
[TextStream] ->[map]->[print]
            \->[map]->[print] 

这里问题应该变得明显了。 map 操作执行了 3 次。这是预期的行为,通常不是问题,因为 map 应该是无状态转换。

这里问题的根本原因是 map 包含变异操作,因为它将元素添加到全局集合 stream_Value 中,该集合定义在传递给 map 的函数范围之外。

这不仅会导致重复问题,而且当 Spark Streaming 在其通常的集群模式下运行时,通常也不会起作用。