在 apache spark 中使用广播变量
Using broadcast variables in apache spark
我在 CoGroupFunction 中使用广播变量时遇到错误。如果 appProvider.value()
被评论,错误就会消失。你知道如何解决这个问题吗?错误与变量定义或初始化有关吗?
public class UsageJobDS implements Serializable{
private static final Logger log = org.apache.log4j.LogManager.getLogger("myLogger");
Broadcast<Provider> appProvider;
void init(){
// init broadcast variable
....
}
public static void main(String[] args) {
UsageJobDS ujb = new UsageJobDS();
ujb.init();
ujb.run();
}
void run(){
KeyValueGroupedDataset<Long, Row> charges = usageCharges.groupByKey(x -> x.getLong(x.fieldIndex("si__subscription_id")), Encoders.LONG());
Dataset<ProcessEdr> cogg = edrs.cogroup(charges, rateEDRs, Encoders.bean(ProcessEdr.class));
log.warn("Count cogg " + cogg.count());
}
CoGroupFunction<Long, EDR2, Row, ProcessEdr> rateEDRs = (subscription_id, edrsIter, chargesIter) -> {
Logger log = org.apache.log4j.LogManager.getLogger("myLogger");
log.warn("inside rateEDRs function");
while (edrsIter.hasNext()) {
appProvider.value(); // HERE
}
return results.iterator();
};
}
我收到了这个错误
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.opencell.spark.jobs.UsageJobDS.rateEDRs of type org.apache.spark.api.java.function.CoGroupFunction in instance of org.opencell.spark.jobs.UsageJobDS
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
实际上,如果将 cogroup 函数定义更改为以下代码,它就可以工作。但是,错误原因仍然未知。
Dataset<ProcessEdr> cogg = edrs.cogroup(charges, (subscription_id, edrsIter, chargesIter) -> {
ArrayList<ProcessEdr> results = new ArrayList<>();
System.out.println("App Provider name" + appProvider.value().getIssuer_name());
return results.iterator();
}, Encoders.bean(ProcessEdr.class));
我在 CoGroupFunction 中使用广播变量时遇到错误。如果 appProvider.value()
被评论,错误就会消失。你知道如何解决这个问题吗?错误与变量定义或初始化有关吗?
public class UsageJobDS implements Serializable{
private static final Logger log = org.apache.log4j.LogManager.getLogger("myLogger");
Broadcast<Provider> appProvider;
void init(){
// init broadcast variable
....
}
public static void main(String[] args) {
UsageJobDS ujb = new UsageJobDS();
ujb.init();
ujb.run();
}
void run(){
KeyValueGroupedDataset<Long, Row> charges = usageCharges.groupByKey(x -> x.getLong(x.fieldIndex("si__subscription_id")), Encoders.LONG());
Dataset<ProcessEdr> cogg = edrs.cogroup(charges, rateEDRs, Encoders.bean(ProcessEdr.class));
log.warn("Count cogg " + cogg.count());
}
CoGroupFunction<Long, EDR2, Row, ProcessEdr> rateEDRs = (subscription_id, edrsIter, chargesIter) -> {
Logger log = org.apache.log4j.LogManager.getLogger("myLogger");
log.warn("inside rateEDRs function");
while (edrsIter.hasNext()) {
appProvider.value(); // HERE
}
return results.iterator();
};
}
我收到了这个错误
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.opencell.spark.jobs.UsageJobDS.rateEDRs of type org.apache.spark.api.java.function.CoGroupFunction in instance of org.opencell.spark.jobs.UsageJobDS
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
实际上,如果将 cogroup 函数定义更改为以下代码,它就可以工作。但是,错误原因仍然未知。
Dataset<ProcessEdr> cogg = edrs.cogroup(charges, (subscription_id, edrsIter, chargesIter) -> {
ArrayList<ProcessEdr> results = new ArrayList<>();
System.out.println("App Provider name" + appProvider.value().getIssuer_name());
return results.iterator();
}, Encoders.bean(ProcessEdr.class));