Spark 广播一个没有空指针的 HashMap 但它也不获取任何值
Spark Broadcasting a HashMap no nullpointer but it doesnt fetch any values aswell
我正在广播一个 hashmap 并从下面的方法返回一个映射
public static Map<Object1, Object2> lkpBC (JavaSparkContext ctx, String FilePath) {
Broadcast<Map<Object1, Object2>> CodeBC = null;
Map<Object1, Object2> codePairMap=null;
try {
Map<Object1, Object2> CodepairMap = LookupUtil.loadLookup(ctx, FilePath);
CodeBC = ctx.broadcast(codePairMap);
codePairMap= CodeBC.value();
} catch (Exception e) {
LOG.error("Error while broadcasting ", e);
}
return codePairMap;
}
并将地图传递给下面的方法
public static JavaRDD<Object3> fetchDetails(
JavaSparkContext ctx,
JavaRDD<Object3> CleanFileRDD,
String FilePath,
Map<Object1, Object2> BcMap
) {
JavaRDD<Object3r> assignCd = CleanFileRDD.map(row -> {
object3 FileData = null;
try {
FileData = row;
if (BCMap.containsKey("some key")) {......}
} catch (Exception e) {
LOG.error("Error in Map function ", e);
}
return some object;
});
return assignCd;
}
在本地模式下,它工作正常,没有任何问题,但是当我 运行 在 EC2 上的 spark 独立集群(1 master 3 slaves)上,这不会获取任何值,也不会引发错误。您在方法中看到的所有对象都已序列化。如果我从主 class 或任何其他不同的 class 调用这些方法有关系吗?
PS: 我们在 spark conf
中使用了 Kyro 序列化器
我想这是怎么回事,您没有在 map 函数的闭包内访问广播变量。我认为您正在直接访问底层 BcMap
(或 BCMap
,不确定它们是否应该不同)。
行 if (BCMap.containsKey("some key"))
未访问广播变量 CodeBC
。因为看起来 BCMap
的类型是 Map
,而不是 Broadcast
.
要访问广播变量,您将调用 CodeBC.value.containsKey
。
Spark 是以函数式方式设计的,它不会 "do" 底层映射的任何内容,它会制作它的副本,广播该副本,并将该副本包装在 Broadcast
类型。
我不知道 LookupUtil.loadLookup
是做什么的,但我想如果文件不存在或为空,那么它 return 是一张空地图吗?
下面是一个如何在 Scala 中执行此操作的示例:
val bcMap = ctx.broadcast(LookupUtil.loadLookup(ctx, FilePath))
cleanFileRDD.map(row =>
if (bcMap.value.containsKey("some key") ...
else ...)
我想你会按照我一个朋友的智慧的话来解决你的问题 "first solve all the obvious issues, then the harder issues seem to solve themselves"。在您的情况下,它们是:
- 使用初始化为 null 的可变变量
- 使用 try catches 记录错误但不重新抛出它们。让异常冒出来。
- 在将其作为一种方法工作之前,过早地将事情分成许多不同的方法。
仅仅因为某些东西在本地工作并不意味着它在分发时也能工作。 运行 本地和跨集群之间有很多差异,例如:a) 数据局部性 b) 序列化 c) 闭包捕获 d) 线程数 e) 执行顺序 ... 等
我正在广播一个 hashmap 并从下面的方法返回一个映射
public static Map<Object1, Object2> lkpBC (JavaSparkContext ctx, String FilePath) {
Broadcast<Map<Object1, Object2>> CodeBC = null;
Map<Object1, Object2> codePairMap=null;
try {
Map<Object1, Object2> CodepairMap = LookupUtil.loadLookup(ctx, FilePath);
CodeBC = ctx.broadcast(codePairMap);
codePairMap= CodeBC.value();
} catch (Exception e) {
LOG.error("Error while broadcasting ", e);
}
return codePairMap;
}
并将地图传递给下面的方法
public static JavaRDD<Object3> fetchDetails(
JavaSparkContext ctx,
JavaRDD<Object3> CleanFileRDD,
String FilePath,
Map<Object1, Object2> BcMap
) {
JavaRDD<Object3r> assignCd = CleanFileRDD.map(row -> {
object3 FileData = null;
try {
FileData = row;
if (BCMap.containsKey("some key")) {......}
} catch (Exception e) {
LOG.error("Error in Map function ", e);
}
return some object;
});
return assignCd;
}
在本地模式下,它工作正常,没有任何问题,但是当我 运行 在 EC2 上的 spark 独立集群(1 master 3 slaves)上,这不会获取任何值,也不会引发错误。您在方法中看到的所有对象都已序列化。如果我从主 class 或任何其他不同的 class 调用这些方法有关系吗?
PS: 我们在 spark conf
中使用了 Kyro 序列化器我想这是怎么回事,您没有在 map 函数的闭包内访问广播变量。我认为您正在直接访问底层 BcMap
(或 BCMap
,不确定它们是否应该不同)。
行 if (BCMap.containsKey("some key"))
未访问广播变量 CodeBC
。因为看起来 BCMap
的类型是 Map
,而不是 Broadcast
.
要访问广播变量,您将调用 CodeBC.value.containsKey
。
Spark 是以函数式方式设计的,它不会 "do" 底层映射的任何内容,它会制作它的副本,广播该副本,并将该副本包装在 Broadcast
类型。
我不知道 LookupUtil.loadLookup
是做什么的,但我想如果文件不存在或为空,那么它 return 是一张空地图吗?
下面是一个如何在 Scala 中执行此操作的示例:
val bcMap = ctx.broadcast(LookupUtil.loadLookup(ctx, FilePath))
cleanFileRDD.map(row =>
if (bcMap.value.containsKey("some key") ...
else ...)
我想你会按照我一个朋友的智慧的话来解决你的问题 "first solve all the obvious issues, then the harder issues seem to solve themselves"。在您的情况下,它们是:
- 使用初始化为 null 的可变变量
- 使用 try catches 记录错误但不重新抛出它们。让异常冒出来。
- 在将其作为一种方法工作之前,过早地将事情分成许多不同的方法。
仅仅因为某些东西在本地工作并不意味着它在分发时也能工作。 运行 本地和跨集群之间有很多差异,例如:a) 数据局部性 b) 序列化 c) 闭包捕获 d) 线程数 e) 执行顺序 ... 等