为什么 Spark Streaming 中的多个 print() 方法会影响我列表中的值?
Why do multiple print() methods in Spark Streaming affect the values in my list?
我正在尝试每两秒接收一个 JSON 行,将它们存储在一个列表中,该列表包含我创建的服装 Class 中的元素,并在每个之后打印结果列表上下文的执行。所以我正在做这样的事情:
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
JavaReceiverInputDStream<String> streamData = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<LinkedList<StreamValue>> getdatatoplace = streamData.map(new Function<String, LinkedList<StreamValue>>() {
@Override
public LinkedList<StreamValue> call(String s) throws Exception {
//Access specific attributes in the JSON
Gson gson = new Gson();
Type type = new TypeToken<Map<String, String>>() {
}.getType();
Map<String, String> retMap = gson.fromJson(s, type);
String a = retMap.get("exp");
String idValue = retMap.get("id");
//insert values into the stream_Value LinkedList
stream_Value.push(new StreamValue(idValue, a, UUID.randomUUID()));
return stream_Value;
}
});
getdatatoplace.print();
效果很好,我得到了以下结果:
//at the end of the first batch duration/cycle
getdatatoplace[]={json1}
//at the end of the second batch duration/cycle
getdatatoplace[]={json1,json2}
...
但是,如果我多次打印 getdatatoplace
,假设 3:
getdatatoplace.print();
getdatatoplace.print();
getdatatoplace.print();
然后我得到这个结果:
//at the end of the first print
getdatatoplace[]={json1}
//at the end of the second print
getdatatoplace[]={json1,json1}
//at the end of the third print
getdatatoplace[]={json1,json1,json1}
//Context ends with getdatatoplace.size()=3
//New cycle begins, and I get a new value json2
//at the end of the first print
getdatatoplace[]={json1,json1,json1,json2}
...
那么发生的事情是,对于我所做的每一次打印,即使我之前做了 stream_Value.push
,并且我在批处理持续时间内给出的命令还没有结束,stream_Value
推送为我所做的每张印刷品添加到我的列表中的值。
我的问题是,为什么会发生这种情况,我该如何做到这一点,独立于我使用的 print() 方法的数量,我每次只在列表中存储一个 JSON 行批量 Duration/per 执行。
我希望我没有混淆,因为我是 Spark 的新手,可能混淆了一些词汇。非常感谢。
PS:即使我打印另一个DStream,同样的事情也会发生。假设我这样做,每个都具有上面流的相同 'architecture':
JavaDStream1.print();
JavaDStream2.print();
在JavaDStream2.print()的最后,JavaDstream1中的列表有一个额外的值。
Spark Streaming 使用与 Spark 相同的计算模型。我们在数据上声明的操作形成一个有向无环图 (DAG),当使用操作来具体化数据上的此类计算时会对其进行评估。
在 Spark Streaming 中,输出操作,例如 print()
将在每个批处理间隔安排这些操作的具体化。
此流的 DAG 看起来像这样:
[TextStream]->[map]->[print]
print
将安排对 socketTextStream
接收到的数据执行 map
操作。当我们添加更多 print
个动作时,我们的 DAG 看起来像:
/->[map]->[print]
[TextStream] ->[map]->[print]
\->[map]->[print]
这里问题应该变得明显了。 map
操作执行了 3 次。这是预期的行为,通常不是问题,因为 map
应该是无状态转换。
这里问题的根本原因是 map 包含变异操作,因为它将元素添加到全局集合 stream_Value
中,该集合定义在传递给 map
的函数范围之外。
这不仅会导致重复问题,而且当 Spark Streaming 在其通常的集群模式下运行时,通常也不会起作用。
我正在尝试每两秒接收一个 JSON 行,将它们存储在一个列表中,该列表包含我创建的服装 Class 中的元素,并在每个之后打印结果列表上下文的执行。所以我正在做这样的事情:
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
JavaReceiverInputDStream<String> streamData = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<LinkedList<StreamValue>> getdatatoplace = streamData.map(new Function<String, LinkedList<StreamValue>>() {
@Override
public LinkedList<StreamValue> call(String s) throws Exception {
//Access specific attributes in the JSON
Gson gson = new Gson();
Type type = new TypeToken<Map<String, String>>() {
}.getType();
Map<String, String> retMap = gson.fromJson(s, type);
String a = retMap.get("exp");
String idValue = retMap.get("id");
//insert values into the stream_Value LinkedList
stream_Value.push(new StreamValue(idValue, a, UUID.randomUUID()));
return stream_Value;
}
});
getdatatoplace.print();
效果很好,我得到了以下结果:
//at the end of the first batch duration/cycle
getdatatoplace[]={json1}
//at the end of the second batch duration/cycle
getdatatoplace[]={json1,json2}
...
但是,如果我多次打印 getdatatoplace
,假设 3:
getdatatoplace.print();
getdatatoplace.print();
getdatatoplace.print();
然后我得到这个结果:
//at the end of the first print
getdatatoplace[]={json1}
//at the end of the second print
getdatatoplace[]={json1,json1}
//at the end of the third print
getdatatoplace[]={json1,json1,json1}
//Context ends with getdatatoplace.size()=3
//New cycle begins, and I get a new value json2
//at the end of the first print
getdatatoplace[]={json1,json1,json1,json2}
...
那么发生的事情是,对于我所做的每一次打印,即使我之前做了 stream_Value.push
,并且我在批处理持续时间内给出的命令还没有结束,stream_Value
推送为我所做的每张印刷品添加到我的列表中的值。
我的问题是,为什么会发生这种情况,我该如何做到这一点,独立于我使用的 print() 方法的数量,我每次只在列表中存储一个 JSON 行批量 Duration/per 执行。
我希望我没有混淆,因为我是 Spark 的新手,可能混淆了一些词汇。非常感谢。
PS:即使我打印另一个DStream,同样的事情也会发生。假设我这样做,每个都具有上面流的相同 'architecture':
JavaDStream1.print();
JavaDStream2.print();
在JavaDStream2.print()的最后,JavaDstream1中的列表有一个额外的值。
Spark Streaming 使用与 Spark 相同的计算模型。我们在数据上声明的操作形成一个有向无环图 (DAG),当使用操作来具体化数据上的此类计算时会对其进行评估。
在 Spark Streaming 中,输出操作,例如 print()
将在每个批处理间隔安排这些操作的具体化。
此流的 DAG 看起来像这样:
[TextStream]->[map]->[print]
print
将安排对 socketTextStream
接收到的数据执行 map
操作。当我们添加更多 print
个动作时,我们的 DAG 看起来像:
/->[map]->[print]
[TextStream] ->[map]->[print]
\->[map]->[print]
这里问题应该变得明显了。 map
操作执行了 3 次。这是预期的行为,通常不是问题,因为 map
应该是无状态转换。
这里问题的根本原因是 map 包含变异操作,因为它将元素添加到全局集合 stream_Value
中,该集合定义在传递给 map
的函数范围之外。
这不仅会导致重复问题,而且当 Spark Streaming 在其通常的集群模式下运行时,通常也不会起作用。