Spark 程序结构:类 中的广播变量与最终静态与外部静态属性
Spark program structure: broadcast variables vs final static vs external static attributes in classes
我有一个应用程序应该从文件中读取一些行并将它们作为最终变量用作参考。
截至目前,在 spark 上下文开始之前,我在 class(称为 People)class which
中启动了一个静态方法
reads the file;
fill a final static HashTable;
static{ hashTable.put(eachline);}
在我的转换代码中,例如:
JavaRDD<String> filteredRDD = anotherRDD.filter( new Function<String,Boolean>(){
public Boolean call(String s){
People.hashTable.containsKey(s);
}
});
疑问:
- 我应该使用在
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStream").setMaster("local[2]");
声明后立即初始化的广播变量吗?
- 为什么我应该选择一个高于最后一个的广播变量?据我所见,final在转换流程里面顺利通过
- 是correct/elegant程序在SparkStreaming计算开始前加载某处文件内容吗?
- 如果我有外部 classes 负责一些计算(主要是为了可读性),我更愿意以静态方式访问这些方法还是实例化 class e 在
rdd.foreachPartition(....
?
1) 我们应该使用广播变量吗?
是
2) 广播变量 vs 静态初始化变量
不要使用静态变量来传递序列化数据。
一般来说,当我们将作业扩展到集群中超过 1 台机器时,带有数据 的静态变量将无法与 Spark 一起使用(看起来它不起作用, b/c Spark 运行 处于 local(2)
模式。
静态字段是对象初始化的一部分,而不是序列化形式的一部分,因为它们 can/should 在远程处理操作的接收端重建。请注意,它 可以 工作,如果对象足够智能以在反序列化时重建其内容。
相反,我们可以使用可以序列化的普通实例。 (比如 mydata = new HashMap<>(); mydata.put(...)
2.1) 实例变量与广播变量
假设我们有一个大型数据集,有 420 个分区和一个由 8 个执行节点组成的集群。在这样的操作中:
val referenceData = Map(...)
val filtered = rdd.filter(elem => referenceData.get(elem) > 10)
referenceData
对象将被序列化 420 次,或执行转换所需的任务数。
而是一个广播变量:
val referenceDataBC = sparkContext.broadcast(Map(...))
val filtered = rdd.filter(elem => referenceDataBC.value.get(elem) > 10)
将发送给每个执行者一次,或总共发送 8 次。因此,通过减少序列化开销节省了大量网络和 CPU。
3) 是correct/elegant程序在SparkStreaming计算开始之前将文件内容加载到某处吗?
我们需要在流处理开始之前加载外部数据。我们还有哪些选择?
4) 以静态方式使用函数或在 rdd.foreachPartition(....)
中实例化 class
取决于函数是否需要 class 提供的上下文。
例如需要上下文:
rdd.foreachPartition{ iter =>
val jsonParser = new JsonParser(validation)
val parsed = iter.map(jsonParser.parse)
...
}
例如不需要上下文
vectors.foreachPartition{ iter =>
val magnitudes = iter.map(elem => MyVectorMath.modulus(elem))
}
我有一个应用程序应该从文件中读取一些行并将它们作为最终变量用作参考。
截至目前,在 spark 上下文开始之前,我在 class(称为 People)class which
reads the file;
fill a final static HashTable;
static{ hashTable.put(eachline);}
在我的转换代码中,例如:
JavaRDD<String> filteredRDD = anotherRDD.filter( new Function<String,Boolean>(){
public Boolean call(String s){
People.hashTable.containsKey(s);
}
});
疑问:
- 我应该使用在
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStream").setMaster("local[2]");
声明后立即初始化的广播变量吗? - 为什么我应该选择一个高于最后一个的广播变量?据我所见,final在转换流程里面顺利通过
- 是correct/elegant程序在SparkStreaming计算开始前加载某处文件内容吗?
- 如果我有外部 classes 负责一些计算(主要是为了可读性),我更愿意以静态方式访问这些方法还是实例化 class e 在
rdd.foreachPartition(....
?
1) 我们应该使用广播变量吗?
是
2) 广播变量 vs 静态初始化变量
不要使用静态变量来传递序列化数据。
一般来说,当我们将作业扩展到集群中超过 1 台机器时,带有数据 的静态变量将无法与 Spark 一起使用(看起来它不起作用, b/c Spark 运行 处于 local(2)
模式。
静态字段是对象初始化的一部分,而不是序列化形式的一部分,因为它们 can/should 在远程处理操作的接收端重建。请注意,它 可以 工作,如果对象足够智能以在反序列化时重建其内容。
相反,我们可以使用可以序列化的普通实例。 (比如 mydata = new HashMap<>(); mydata.put(...)
2.1) 实例变量与广播变量
假设我们有一个大型数据集,有 420 个分区和一个由 8 个执行节点组成的集群。在这样的操作中:
val referenceData = Map(...)
val filtered = rdd.filter(elem => referenceData.get(elem) > 10)
referenceData
对象将被序列化 420 次,或执行转换所需的任务数。
而是一个广播变量:
val referenceDataBC = sparkContext.broadcast(Map(...))
val filtered = rdd.filter(elem => referenceDataBC.value.get(elem) > 10)
将发送给每个执行者一次,或总共发送 8 次。因此,通过减少序列化开销节省了大量网络和 CPU。
3) 是correct/elegant程序在SparkStreaming计算开始之前将文件内容加载到某处吗?
我们需要在流处理开始之前加载外部数据。我们还有哪些选择?
4) 以静态方式使用函数或在 rdd.foreachPartition(....)
中实例化 class取决于函数是否需要 class 提供的上下文。 例如需要上下文:
rdd.foreachPartition{ iter =>
val jsonParser = new JsonParser(validation)
val parsed = iter.map(jsonParser.parse)
...
}
例如不需要上下文
vectors.foreachPartition{ iter =>
val magnitudes = iter.map(elem => MyVectorMath.modulus(elem))
}