Spark:将RDD映射到RDD返回空指针异常
Spark: Mapping RDD to RDD returning nullpointer Exception
按照我的逻辑,我试图将 JDBCRDD - TopologyRDD 映射到 JavaRDD 的 JavaBean。
TopologyRDD.count() returns 正确的行数,所以我知道数据已正确加载。我还将 TopologyRDD 映射到 List 以确认数据已加载,并且它加载到数据中就好了。
问题:将 TopologyRDD 映射到 MODEL_TOPOLOGYRDD 时,MODEL_TOPOLOGYRDD.count() 操作返回空指针异常错误。
型号:
public class modelTopology implements Serializable {
private String A_TYPE;
private String Z_TYPE;
private String A_CLLI;
private String Z_CLLI;
private String A_HOSTNAME;
private String Z_HOSTNAME;
private String A_LOCATION;
private String A_LOC_TYPE;
private String Z_LOCATION;
private String Z_LOC_TYPE;
private String A_SHELF;
private String A_SLOT;
private String A_CARD;
private String A_PORT;
private String A_INTERFACE;
private String A_IF_DESC;
private String Z_SHELF;
private String Z_SLOT;
private String Z_CARD;
private String Z_PORT;
private String Z_INTERFACE;
private String Z_IF_DESC;
private String A_CARD_NAME;
private String Z_CARD_NAME;
private String PHY_CIRCUIT_ID;
private String LAG_CIRCUIT_ID;
private String PHY_CIRCUIT_ALIAS;
private String A_VENDOR;
private String A_MODEL;
private String A_TECHNOLOGY;
private String Z_VENDOR;
private String Z_MODEL;
private String Z_TECHNOLOGY;
private String A_EH_ELEMENT_ID;
private String A_EH_MACHINE_ID;
private String Z_EH_ELEMENT_ID;
private String Z_EH_MACHINE_ID;
private String A_EH_SPEED;
private String Z_EH_SPEED;
private String A_EH_SPEED1;
private String Z_EH_SPEED1;
private String A_EH_EHEALTH_DOMAIN;
private String Z_EH_EHEALTH_DOMAIN;
private String A_MRTG_HOSTID;
private String A_MRTG_GRPID;
private String A_MRTG_IFID;
private String Z_MRTG_HOSTID;
private String Z_MRTG_GRPID;
private String Z_MRTG_IFID;
private String A_MGMT_IP;
private String Z_MGMT_IP;
private String A_IF_INDEX;
private String Z_IF_INDEX;
private String IS_PROD;
private String TOPOLOGY_KEY;
private String COMMIT_TS;
public void setA_TYPE(String A_TYPE) {
this.A_TYPE = A_TYPE;
}
public void setZ_TYPE(String Z_TYPE) {
this.Z_TYPE = Z_TYPE;
}
public void setA_CLLI(String A_CLLI) {
this.A_CLLI = A_CLLI;
}
public void setZ_CLLI(String Z_CLLI) {
this.Z_CLLI = Z_CLLI;
}
public void setA_HOSTNAME(String A_HOSTNAME) {
this.A_HOSTNAME = A_HOSTNAME;
}
public void setZ_HOSTNAME(String Z_HOSTNAME) {
this.Z_HOSTNAME = Z_HOSTNAME;
}
public void setA_LOCATION(String A_LOCATION) {
this.A_LOCATION = A_LOCATION;
}
public void setA_LOC_TYPE(String A_LOC_TYPE) {
this.A_LOC_TYPE = A_LOC_TYPE;
}
public void setZ_LOCATION(String Z_LOCATION) {
this.Z_LOCATION = Z_LOCATION;
}
public void setZ_LOC_TYPE(String Z_LOC_TYPE) {
this.Z_LOC_TYPE = Z_LOC_TYPE;
}
public void setA_SHELF(String A_SHELF) {
this.A_SHELF = A_SHELF;
}
public void setA_SLOT(String A_SLOT) {
this.A_SLOT = A_SLOT;
}
public void setA_CARD(String A_CARD) {
this.A_CARD = A_CARD;
}
public void setA_PORT(String A_PORT) {
this.A_PORT = A_PORT;
}
public void setA_INTERFACE(String A_INTERFACE) {
this.A_INTERFACE = A_INTERFACE;
}
public void setA_IF_DESC(String A_IF_DESC) {
this.A_IF_DESC = A_IF_DESC;
}
public void setZ_SHELF(String Z_SHELF) {
this.Z_SHELF = Z_SHELF;
}
public void setZ_SLOT(String Z_SLOT) {
this.Z_SLOT = Z_SLOT;
}
public void setZ_CARD(String Z_CARD) {
this.Z_CARD = Z_CARD;
}
public void setZ_PORT(String Z_PORT) {
this.Z_PORT = Z_PORT;
}
public void setZ_INTERFACE(String Z_INTERFACE) {
this.Z_INTERFACE = Z_INTERFACE;
}
public void setZ_IF_DESC(String Z_IF_DESC) {
this.Z_IF_DESC = Z_IF_DESC;
}
public void setA_CARD_NAME(String A_CARD_NAME) {
this.A_CARD_NAME = A_CARD_NAME;
}
public void setZ_CARD_NAME(String Z_CARD_NAME) {
this.Z_CARD_NAME = Z_CARD_NAME;
}
public void setPHY_CIRCUIT_ID(String PHY_CIRCUIT_ID) {
this.PHY_CIRCUIT_ID = PHY_CIRCUIT_ID;
}
public void setLAG_CIRCUIT_ID(String LAG_CIRCUIT_ID) {
this.LAG_CIRCUIT_ID = LAG_CIRCUIT_ID;
}
public void setPHY_CIRCUIT_ALIAS(String PHY_CIRCUIT_ALIAS) {
this.PHY_CIRCUIT_ALIAS = PHY_CIRCUIT_ALIAS;
}
public void setA_VENDOR(String A_VENDOR) {
this.A_VENDOR = A_VENDOR;
}
public void setA_MODEL(String A_MODEL) {
this.A_MODEL = A_MODEL;
}
public void setA_TECHNOLOGY(String A_TECHNOLOGY) {
this.A_TECHNOLOGY = A_TECHNOLOGY;
}
public void setZ_VENDOR(String Z_VENDOR) {
this.Z_VENDOR = Z_VENDOR;
}
public void setZ_MODEL(String Z_MODEL) {
this.Z_MODEL = Z_MODEL;
}
public void setZ_TECHNOLOGY(String Z_TECHNOLOGY) {
this.Z_TECHNOLOGY = Z_TECHNOLOGY;
}
public void setA_EH_ELEMENT_ID(String A_EH_ELEMENT_ID) {
this.A_EH_ELEMENT_ID = A_EH_ELEMENT_ID;
}
public void setA_EH_MACHINE_ID(String A_EH_MACHINE_ID) {
this.A_EH_MACHINE_ID = A_EH_MACHINE_ID;
}
public void setZ_EH_ELEMENT_ID(String Z_EH_ELEMENT_ID) {
this.Z_EH_ELEMENT_ID = Z_EH_ELEMENT_ID;
}
public void setZ_EH_MACHINE_ID(String Z_EH_MACHINE_ID) {
this.Z_EH_MACHINE_ID = Z_EH_MACHINE_ID;
}
public void setA_EH_SPEED(String A_EH_SPEED) {
this.A_EH_SPEED = A_EH_SPEED;
}
public void setZ_EH_SPEED(String Z_EH_SPEED) {
this.Z_EH_SPEED = Z_EH_SPEED;
}
public void setA_EH_SPEED1(String A_EH_SPEED1) {
this.A_EH_SPEED1 = A_EH_SPEED1;
}
public void setZ_EH_SPEED1(String Z_EH_SPEED1) {
this.Z_EH_SPEED1 = Z_EH_SPEED1;
}
public void setA_EH_EHEALTH_DOMAIN(String A_EH_EHEALTH_DOMAIN) {
this.A_EH_EHEALTH_DOMAIN = A_EH_EHEALTH_DOMAIN;
}
public void setZ_EH_EHEALTH_DOMAIN(String Z_EH_EHEALTH_DOMAIN) {
this.Z_EH_EHEALTH_DOMAIN = Z_EH_EHEALTH_DOMAIN;
}
public void setA_MRTG_HOSTID(String A_MRTG_HOSTID) {
this.A_MRTG_HOSTID = A_MRTG_HOSTID;
}
public void setA_MRTG_GRPID(String A_MRTG_GRPID) {
this.A_MRTG_GRPID = A_MRTG_GRPID;
}
public void setA_MRTG_IFID(String A_MRTG_IFID) {
this.A_MRTG_IFID = A_MRTG_IFID;
}
public void setZ_MRTG_HOSTID(String Z_MRTG_HOSTID) {
this.Z_MRTG_HOSTID = Z_MRTG_HOSTID;
}
public void setZ_MRTG_GRPID(String Z_MRTG_GRPID) {
this.Z_MRTG_GRPID = Z_MRTG_GRPID;
}
public void setZ_MRTG_IFID(String Z_MRTG_IFID) {
this.Z_MRTG_IFID = Z_MRTG_IFID;
}
public void setA_MGMT_IP(String A_MGMT_IP) {
this.A_MGMT_IP = A_MGMT_IP;
}
public void setZ_MGMT_IP(String Z_MGMT_IP) {
this.Z_MGMT_IP = Z_MGMT_IP;
}
public void setA_IF_INDEX(String A_IF_INDEX) {
this.A_IF_INDEX = A_IF_INDEX;
}
public void setZ_IF_INDEX(String Z_IF_INDEX) {
this.Z_IF_INDEX = Z_IF_INDEX;
}
public void setIS_PROD(String IS_PROD) {
this.IS_PROD = IS_PROD;
}
public void setTOPOLOGY_KEY(String TOPOLOGY_KEY) {
this.TOPOLOGY_KEY = TOPOLOGY_KEY;
}
public void setCOMMIT_TS(String COMMIT_TS) {
this.COMMIT_TS = COMMIT_TS;
}
public String getA_TYPE() {
return A_TYPE;
}
public String getZ_TYPE() {
return Z_TYPE;
}
public String getA_CLLI() {
return A_CLLI;
}
public String getZ_CLLI() {
return Z_CLLI;
}
public String getA_HOSTNAME() {
return A_HOSTNAME;
}
public String getZ_HOSTNAME() {
return Z_HOSTNAME;
}
public String getA_LOCATION() {
return A_LOCATION;
}
public String getA_LOC_TYPE() {
return A_LOC_TYPE;
}
public String getZ_LOCATION() {
return Z_LOCATION;
}
public String getZ_LOC_TYPE() {
return Z_LOC_TYPE;
}
public String getA_SHELF() {
return A_SHELF;
}
public String getA_SLOT() {
return A_SLOT;
}
public String getA_CARD() {
return A_CARD;
}
public String getA_PORT() {
return A_PORT;
}
public String getA_INTERFACE() {
return A_INTERFACE;
}
public String getA_IF_DESC() {
return A_IF_DESC;
}
public String getZ_SHELF() {
return Z_SHELF;
}
public String getZ_SLOT() {
return Z_SLOT;
}
public String getZ_CARD() {
return Z_CARD;
}
public String getZ_PORT() {
return Z_PORT;
}
public String getZ_INTERFACE() {
return Z_INTERFACE;
}
public String getZ_IF_DESC() {
return Z_IF_DESC;
}
public String getA_CARD_NAME() {
return A_CARD_NAME;
}
public String getZ_CARD_NAME() {
return Z_CARD_NAME;
}
public String getPHY_CIRCUIT_ID() {
return PHY_CIRCUIT_ID;
}
public String getLAG_CIRCUIT_ID() {
return LAG_CIRCUIT_ID;
}
public String getPHY_CIRCUIT_ALIAS() {
return PHY_CIRCUIT_ALIAS;
}
public String getA_VENDOR() {
return A_VENDOR;
}
public String getA_MODEL() {
return A_MODEL;
}
public String getA_TECHNOLOGY() {
return A_TECHNOLOGY;
}
public String getZ_VENDOR() {
return Z_VENDOR;
}
public String getZ_MODEL() {
return Z_MODEL;
}
public String getZ_TECHNOLOGY() {
return Z_TECHNOLOGY;
}
public String getA_EH_ELEMENT_ID() {
return A_EH_ELEMENT_ID;
}
public String getA_EH_MACHINE_ID() {
return A_EH_MACHINE_ID;
}
public String getZ_EH_ELEMENT_ID() {
return Z_EH_ELEMENT_ID;
}
public String getZ_EH_MACHINE_ID() {
return Z_EH_MACHINE_ID;
}
public String getA_EH_SPEED() {
return A_EH_SPEED;
}
public String getZ_EH_SPEED() {
return Z_EH_SPEED;
}
public String getA_EH_SPEED1() {
return A_EH_SPEED1;
}
public String getZ_EH_SPEED1() {
return Z_EH_SPEED1;
}
public String getA_EH_EHEALTH_DOMAIN() {
return A_EH_EHEALTH_DOMAIN;
}
public String getZ_EH_EHEALTH_DOMAIN() {
return Z_EH_EHEALTH_DOMAIN;
}
public String getA_MRTG_HOSTID() {
return A_MRTG_HOSTID;
}
public String getA_MRTG_GRPID() {
return A_MRTG_GRPID;
}
public String getA_MRTG_IFID() {
return A_MRTG_IFID;
}
public String getZ_MRTG_HOSTID() {
return Z_MRTG_HOSTID;
}
public String getZ_MRTG_GRPID() {
return Z_MRTG_GRPID;
}
public String getZ_MRTG_IFID() {
return Z_MRTG_IFID;
}
public String getA_MGMT_IP() {
return A_MGMT_IP;
}
public String getZ_MGMT_IP() {
return Z_MGMT_IP;
}
public String getA_IF_INDEX() {
return A_IF_INDEX;
}
public String getZ_IF_INDEX() {
return Z_IF_INDEX;
}
public String getIS_PROD() {
return IS_PROD;
}
public String getTOPOLOGY_KEY() {
return TOPOLOGY_KEY;
}
public String getCOMMIT_TS() {
return COMMIT_TS;
}
}
代码:
//Push into JDBC RDD Object
JdbcRDD<Object[]> TopologyJDBCRDD = new JdbcRDD(sc.sc(),oraclecon,"SELECT * FROM NPIDWUAT.FIOS_TOPOLOGY_STG WHERE ? = ?",1,1,1, new MapResult(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
//Convert to JavaRDD
JavaRDD<Object[]> TopologyRDD = JavaRDD.fromRDD(TopologyJDBCRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
System.out.println("Number of Records: " + TopologyRDD.count());
JavaRDD<modelTopology> MODEL_TOPOLOGYRDD = TopologyRDD.map(
new Function<Object[], modelTopology>() {
@Override
public modelTopology call(final Object[] line) throws Exception {
Object[] line2 = line;
for(int i=0;i<line2.length;i++){
if(line2[i].toString() == null){
line2[i] = "";
}
else{
line2[i] = line2[i].toString();
}
}
modelTopology toporow = new modelTopology();
toporow.setA_TYPE(line2[0].toString());
toporow.setZ_TYPE(line2[1].toString());
toporow.setA_CLLI(line2[2].toString());
toporow.setZ_CLLI(line2[3].toString());
toporow.setA_HOSTNAME(line2[4].toString());
toporow.setZ_HOSTNAME(line2[5].toString());
toporow.setA_LOCATION(line2[6].toString());
toporow.setA_LOC_TYPE(line2[7].toString());
toporow.setZ_LOCATION(line2[8].toString());
toporow.setZ_LOC_TYPE(line2[9].toString());
toporow.setA_SHELF(line2[10].toString());
toporow.setA_SLOT(line2[11].toString());
toporow.setA_CARD(line2[12].toString());
toporow.setA_PORT(line2[13].toString());
toporow.setA_INTERFACE(line2[14].toString());
toporow.setA_IF_DESC(line2[15].toString());
toporow.setZ_SHELF(line2[16].toString());
toporow.setZ_SLOT(line2[17].toString());
toporow.setZ_CARD(line2[18].toString());
toporow.setZ_PORT(line2[19].toString());
toporow.setZ_INTERFACE(line2[20].toString());
toporow.setZ_IF_DESC(line2[21].toString());
toporow.setA_CARD_NAME(line2[22].toString());
toporow.setZ_CARD_NAME(line2[23].toString());
toporow.setPHY_CIRCUIT_ID(line2[24].toString());
toporow.setLAG_CIRCUIT_ID(line2[25].toString());
toporow.setPHY_CIRCUIT_ALIAS(line2[26].toString());
toporow.setA_VENDOR(line2[27].toString());
toporow.setA_MODEL(line2[28].toString());
toporow.setA_TECHNOLOGY(line2[29].toString());
toporow.setZ_VENDOR(line2[30].toString());
toporow.setZ_MODEL(line2[31].toString());
toporow.setZ_TECHNOLOGY(line2[32].toString());
toporow.setA_EH_ELEMENT_ID(line2[33].toString());
toporow.setA_EH_MACHINE_ID(line2[34].toString());
toporow.setZ_EH_ELEMENT_ID(line2[35].toString());
toporow.setZ_EH_MACHINE_ID(line2[36].toString());
toporow.setA_EH_SPEED(line2[37].toString());
toporow.setZ_EH_SPEED(line2[38].toString());
toporow.setA_EH_SPEED1(line2[39].toString());
toporow.setZ_EH_SPEED1(line2[40].toString());
toporow.setA_EH_EHEALTH_DOMAIN(line2[41].toString());
toporow.setZ_EH_EHEALTH_DOMAIN(line2[42].toString());
toporow.setA_MRTG_HOSTID(line2[43].toString());
toporow.setA_MRTG_GRPID(line2[44].toString());
toporow.setA_MRTG_IFID(line2[45].toString());
toporow.setZ_MRTG_HOSTID(line2[46].toString());
toporow.setZ_MRTG_GRPID(line2[47].toString());
toporow.setZ_MRTG_IFID(line2[48].toString());
toporow.setA_MGMT_IP(line2[49].toString());
toporow.setZ_MGMT_IP(line2[50].toString());
toporow.setA_IF_INDEX(line2[51].toString());
toporow.setZ_IF_INDEX(line2[52].toString());
toporow.setIS_PROD(line2[53].toString());
toporow.setTOPOLOGY_KEY(line2[54].toString());
toporow.setCOMMIT_TS(line2[55].toString());
return toporow;
}
});
System.out.println("MODEL_TOPOLOGYRDD COUNT: " + MODEL_TOPOLOGYRDD.count());
堆栈跟踪:
Application Failed...org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at com.verizon.npi.MainApp.call(MainApp.java:103)
at com.verizon.npi.MainApp.call(MainApp.java:96)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我能看到这段代码的唯一问题是是否有任何 line2[i]
为空。
并且 if(line2[i].toString() == null)
将不起作用,因为 toString()
永远不会 returns null
用于 Object
class。
我看到您希望任何 null 值都变成空字符串,所以尝试将此作为您的 for 循环
for(int i = 0; i < line2.length; i++){
line2[i] = (line2[i] == null) ? "" : String.valueOf(line2[i]);
}
注意 String.valueOf
的使用,它在尝试获取对象的 String 值时完全避免了 NullPointerException。
按照我的逻辑,我试图将 JDBCRDD - TopologyRDD 映射到 JavaRDD 的 JavaBean。
TopologyRDD.count() returns 正确的行数,所以我知道数据已正确加载。我还将 TopologyRDD 映射到 List 以确认数据已加载,并且它加载到数据中就好了。
问题:将 TopologyRDD 映射到 MODEL_TOPOLOGYRDD 时,MODEL_TOPOLOGYRDD.count() 操作返回空指针异常错误。
型号:
public class modelTopology implements Serializable {
private String A_TYPE;
private String Z_TYPE;
private String A_CLLI;
private String Z_CLLI;
private String A_HOSTNAME;
private String Z_HOSTNAME;
private String A_LOCATION;
private String A_LOC_TYPE;
private String Z_LOCATION;
private String Z_LOC_TYPE;
private String A_SHELF;
private String A_SLOT;
private String A_CARD;
private String A_PORT;
private String A_INTERFACE;
private String A_IF_DESC;
private String Z_SHELF;
private String Z_SLOT;
private String Z_CARD;
private String Z_PORT;
private String Z_INTERFACE;
private String Z_IF_DESC;
private String A_CARD_NAME;
private String Z_CARD_NAME;
private String PHY_CIRCUIT_ID;
private String LAG_CIRCUIT_ID;
private String PHY_CIRCUIT_ALIAS;
private String A_VENDOR;
private String A_MODEL;
private String A_TECHNOLOGY;
private String Z_VENDOR;
private String Z_MODEL;
private String Z_TECHNOLOGY;
private String A_EH_ELEMENT_ID;
private String A_EH_MACHINE_ID;
private String Z_EH_ELEMENT_ID;
private String Z_EH_MACHINE_ID;
private String A_EH_SPEED;
private String Z_EH_SPEED;
private String A_EH_SPEED1;
private String Z_EH_SPEED1;
private String A_EH_EHEALTH_DOMAIN;
private String Z_EH_EHEALTH_DOMAIN;
private String A_MRTG_HOSTID;
private String A_MRTG_GRPID;
private String A_MRTG_IFID;
private String Z_MRTG_HOSTID;
private String Z_MRTG_GRPID;
private String Z_MRTG_IFID;
private String A_MGMT_IP;
private String Z_MGMT_IP;
private String A_IF_INDEX;
private String Z_IF_INDEX;
private String IS_PROD;
private String TOPOLOGY_KEY;
private String COMMIT_TS;
public void setA_TYPE(String A_TYPE) {
this.A_TYPE = A_TYPE;
}
public void setZ_TYPE(String Z_TYPE) {
this.Z_TYPE = Z_TYPE;
}
public void setA_CLLI(String A_CLLI) {
this.A_CLLI = A_CLLI;
}
public void setZ_CLLI(String Z_CLLI) {
this.Z_CLLI = Z_CLLI;
}
public void setA_HOSTNAME(String A_HOSTNAME) {
this.A_HOSTNAME = A_HOSTNAME;
}
public void setZ_HOSTNAME(String Z_HOSTNAME) {
this.Z_HOSTNAME = Z_HOSTNAME;
}
public void setA_LOCATION(String A_LOCATION) {
this.A_LOCATION = A_LOCATION;
}
public void setA_LOC_TYPE(String A_LOC_TYPE) {
this.A_LOC_TYPE = A_LOC_TYPE;
}
public void setZ_LOCATION(String Z_LOCATION) {
this.Z_LOCATION = Z_LOCATION;
}
public void setZ_LOC_TYPE(String Z_LOC_TYPE) {
this.Z_LOC_TYPE = Z_LOC_TYPE;
}
public void setA_SHELF(String A_SHELF) {
this.A_SHELF = A_SHELF;
}
public void setA_SLOT(String A_SLOT) {
this.A_SLOT = A_SLOT;
}
public void setA_CARD(String A_CARD) {
this.A_CARD = A_CARD;
}
public void setA_PORT(String A_PORT) {
this.A_PORT = A_PORT;
}
public void setA_INTERFACE(String A_INTERFACE) {
this.A_INTERFACE = A_INTERFACE;
}
public void setA_IF_DESC(String A_IF_DESC) {
this.A_IF_DESC = A_IF_DESC;
}
public void setZ_SHELF(String Z_SHELF) {
this.Z_SHELF = Z_SHELF;
}
public void setZ_SLOT(String Z_SLOT) {
this.Z_SLOT = Z_SLOT;
}
public void setZ_CARD(String Z_CARD) {
this.Z_CARD = Z_CARD;
}
public void setZ_PORT(String Z_PORT) {
this.Z_PORT = Z_PORT;
}
public void setZ_INTERFACE(String Z_INTERFACE) {
this.Z_INTERFACE = Z_INTERFACE;
}
public void setZ_IF_DESC(String Z_IF_DESC) {
this.Z_IF_DESC = Z_IF_DESC;
}
public void setA_CARD_NAME(String A_CARD_NAME) {
this.A_CARD_NAME = A_CARD_NAME;
}
public void setZ_CARD_NAME(String Z_CARD_NAME) {
this.Z_CARD_NAME = Z_CARD_NAME;
}
public void setPHY_CIRCUIT_ID(String PHY_CIRCUIT_ID) {
this.PHY_CIRCUIT_ID = PHY_CIRCUIT_ID;
}
public void setLAG_CIRCUIT_ID(String LAG_CIRCUIT_ID) {
this.LAG_CIRCUIT_ID = LAG_CIRCUIT_ID;
}
public void setPHY_CIRCUIT_ALIAS(String PHY_CIRCUIT_ALIAS) {
this.PHY_CIRCUIT_ALIAS = PHY_CIRCUIT_ALIAS;
}
public void setA_VENDOR(String A_VENDOR) {
this.A_VENDOR = A_VENDOR;
}
public void setA_MODEL(String A_MODEL) {
this.A_MODEL = A_MODEL;
}
public void setA_TECHNOLOGY(String A_TECHNOLOGY) {
this.A_TECHNOLOGY = A_TECHNOLOGY;
}
public void setZ_VENDOR(String Z_VENDOR) {
this.Z_VENDOR = Z_VENDOR;
}
public void setZ_MODEL(String Z_MODEL) {
this.Z_MODEL = Z_MODEL;
}
public void setZ_TECHNOLOGY(String Z_TECHNOLOGY) {
this.Z_TECHNOLOGY = Z_TECHNOLOGY;
}
public void setA_EH_ELEMENT_ID(String A_EH_ELEMENT_ID) {
this.A_EH_ELEMENT_ID = A_EH_ELEMENT_ID;
}
public void setA_EH_MACHINE_ID(String A_EH_MACHINE_ID) {
this.A_EH_MACHINE_ID = A_EH_MACHINE_ID;
}
public void setZ_EH_ELEMENT_ID(String Z_EH_ELEMENT_ID) {
this.Z_EH_ELEMENT_ID = Z_EH_ELEMENT_ID;
}
public void setZ_EH_MACHINE_ID(String Z_EH_MACHINE_ID) {
this.Z_EH_MACHINE_ID = Z_EH_MACHINE_ID;
}
public void setA_EH_SPEED(String A_EH_SPEED) {
this.A_EH_SPEED = A_EH_SPEED;
}
public void setZ_EH_SPEED(String Z_EH_SPEED) {
this.Z_EH_SPEED = Z_EH_SPEED;
}
public void setA_EH_SPEED1(String A_EH_SPEED1) {
this.A_EH_SPEED1 = A_EH_SPEED1;
}
public void setZ_EH_SPEED1(String Z_EH_SPEED1) {
this.Z_EH_SPEED1 = Z_EH_SPEED1;
}
public void setA_EH_EHEALTH_DOMAIN(String A_EH_EHEALTH_DOMAIN) {
this.A_EH_EHEALTH_DOMAIN = A_EH_EHEALTH_DOMAIN;
}
public void setZ_EH_EHEALTH_DOMAIN(String Z_EH_EHEALTH_DOMAIN) {
this.Z_EH_EHEALTH_DOMAIN = Z_EH_EHEALTH_DOMAIN;
}
public void setA_MRTG_HOSTID(String A_MRTG_HOSTID) {
this.A_MRTG_HOSTID = A_MRTG_HOSTID;
}
public void setA_MRTG_GRPID(String A_MRTG_GRPID) {
this.A_MRTG_GRPID = A_MRTG_GRPID;
}
public void setA_MRTG_IFID(String A_MRTG_IFID) {
this.A_MRTG_IFID = A_MRTG_IFID;
}
public void setZ_MRTG_HOSTID(String Z_MRTG_HOSTID) {
this.Z_MRTG_HOSTID = Z_MRTG_HOSTID;
}
public void setZ_MRTG_GRPID(String Z_MRTG_GRPID) {
this.Z_MRTG_GRPID = Z_MRTG_GRPID;
}
public void setZ_MRTG_IFID(String Z_MRTG_IFID) {
this.Z_MRTG_IFID = Z_MRTG_IFID;
}
public void setA_MGMT_IP(String A_MGMT_IP) {
this.A_MGMT_IP = A_MGMT_IP;
}
public void setZ_MGMT_IP(String Z_MGMT_IP) {
this.Z_MGMT_IP = Z_MGMT_IP;
}
public void setA_IF_INDEX(String A_IF_INDEX) {
this.A_IF_INDEX = A_IF_INDEX;
}
public void setZ_IF_INDEX(String Z_IF_INDEX) {
this.Z_IF_INDEX = Z_IF_INDEX;
}
public void setIS_PROD(String IS_PROD) {
this.IS_PROD = IS_PROD;
}
public void setTOPOLOGY_KEY(String TOPOLOGY_KEY) {
this.TOPOLOGY_KEY = TOPOLOGY_KEY;
}
public void setCOMMIT_TS(String COMMIT_TS) {
this.COMMIT_TS = COMMIT_TS;
}
public String getA_TYPE() {
return A_TYPE;
}
public String getZ_TYPE() {
return Z_TYPE;
}
public String getA_CLLI() {
return A_CLLI;
}
public String getZ_CLLI() {
return Z_CLLI;
}
public String getA_HOSTNAME() {
return A_HOSTNAME;
}
public String getZ_HOSTNAME() {
return Z_HOSTNAME;
}
public String getA_LOCATION() {
return A_LOCATION;
}
public String getA_LOC_TYPE() {
return A_LOC_TYPE;
}
public String getZ_LOCATION() {
return Z_LOCATION;
}
public String getZ_LOC_TYPE() {
return Z_LOC_TYPE;
}
public String getA_SHELF() {
return A_SHELF;
}
public String getA_SLOT() {
return A_SLOT;
}
public String getA_CARD() {
return A_CARD;
}
public String getA_PORT() {
return A_PORT;
}
public String getA_INTERFACE() {
return A_INTERFACE;
}
public String getA_IF_DESC() {
return A_IF_DESC;
}
public String getZ_SHELF() {
return Z_SHELF;
}
public String getZ_SLOT() {
return Z_SLOT;
}
public String getZ_CARD() {
return Z_CARD;
}
public String getZ_PORT() {
return Z_PORT;
}
public String getZ_INTERFACE() {
return Z_INTERFACE;
}
public String getZ_IF_DESC() {
return Z_IF_DESC;
}
public String getA_CARD_NAME() {
return A_CARD_NAME;
}
public String getZ_CARD_NAME() {
return Z_CARD_NAME;
}
public String getPHY_CIRCUIT_ID() {
return PHY_CIRCUIT_ID;
}
public String getLAG_CIRCUIT_ID() {
return LAG_CIRCUIT_ID;
}
public String getPHY_CIRCUIT_ALIAS() {
return PHY_CIRCUIT_ALIAS;
}
public String getA_VENDOR() {
return A_VENDOR;
}
public String getA_MODEL() {
return A_MODEL;
}
public String getA_TECHNOLOGY() {
return A_TECHNOLOGY;
}
public String getZ_VENDOR() {
return Z_VENDOR;
}
public String getZ_MODEL() {
return Z_MODEL;
}
public String getZ_TECHNOLOGY() {
return Z_TECHNOLOGY;
}
public String getA_EH_ELEMENT_ID() {
return A_EH_ELEMENT_ID;
}
public String getA_EH_MACHINE_ID() {
return A_EH_MACHINE_ID;
}
public String getZ_EH_ELEMENT_ID() {
return Z_EH_ELEMENT_ID;
}
public String getZ_EH_MACHINE_ID() {
return Z_EH_MACHINE_ID;
}
public String getA_EH_SPEED() {
return A_EH_SPEED;
}
public String getZ_EH_SPEED() {
return Z_EH_SPEED;
}
public String getA_EH_SPEED1() {
return A_EH_SPEED1;
}
public String getZ_EH_SPEED1() {
return Z_EH_SPEED1;
}
public String getA_EH_EHEALTH_DOMAIN() {
return A_EH_EHEALTH_DOMAIN;
}
public String getZ_EH_EHEALTH_DOMAIN() {
return Z_EH_EHEALTH_DOMAIN;
}
public String getA_MRTG_HOSTID() {
return A_MRTG_HOSTID;
}
public String getA_MRTG_GRPID() {
return A_MRTG_GRPID;
}
public String getA_MRTG_IFID() {
return A_MRTG_IFID;
}
public String getZ_MRTG_HOSTID() {
return Z_MRTG_HOSTID;
}
public String getZ_MRTG_GRPID() {
return Z_MRTG_GRPID;
}
public String getZ_MRTG_IFID() {
return Z_MRTG_IFID;
}
public String getA_MGMT_IP() {
return A_MGMT_IP;
}
public String getZ_MGMT_IP() {
return Z_MGMT_IP;
}
public String getA_IF_INDEX() {
return A_IF_INDEX;
}
public String getZ_IF_INDEX() {
return Z_IF_INDEX;
}
public String getIS_PROD() {
return IS_PROD;
}
public String getTOPOLOGY_KEY() {
return TOPOLOGY_KEY;
}
public String getCOMMIT_TS() {
return COMMIT_TS;
}
}
代码:
//Push into JDBC RDD Object
JdbcRDD<Object[]> TopologyJDBCRDD = new JdbcRDD(sc.sc(),oraclecon,"SELECT * FROM NPIDWUAT.FIOS_TOPOLOGY_STG WHERE ? = ?",1,1,1, new MapResult(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
//Convert to JavaRDD
JavaRDD<Object[]> TopologyRDD = JavaRDD.fromRDD(TopologyJDBCRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
System.out.println("Number of Records: " + TopologyRDD.count());
JavaRDD<modelTopology> MODEL_TOPOLOGYRDD = TopologyRDD.map(
new Function<Object[], modelTopology>() {
@Override
public modelTopology call(final Object[] line) throws Exception {
Object[] line2 = line;
for(int i=0;i<line2.length;i++){
if(line2[i].toString() == null){
line2[i] = "";
}
else{
line2[i] = line2[i].toString();
}
}
modelTopology toporow = new modelTopology();
toporow.setA_TYPE(line2[0].toString());
toporow.setZ_TYPE(line2[1].toString());
toporow.setA_CLLI(line2[2].toString());
toporow.setZ_CLLI(line2[3].toString());
toporow.setA_HOSTNAME(line2[4].toString());
toporow.setZ_HOSTNAME(line2[5].toString());
toporow.setA_LOCATION(line2[6].toString());
toporow.setA_LOC_TYPE(line2[7].toString());
toporow.setZ_LOCATION(line2[8].toString());
toporow.setZ_LOC_TYPE(line2[9].toString());
toporow.setA_SHELF(line2[10].toString());
toporow.setA_SLOT(line2[11].toString());
toporow.setA_CARD(line2[12].toString());
toporow.setA_PORT(line2[13].toString());
toporow.setA_INTERFACE(line2[14].toString());
toporow.setA_IF_DESC(line2[15].toString());
toporow.setZ_SHELF(line2[16].toString());
toporow.setZ_SLOT(line2[17].toString());
toporow.setZ_CARD(line2[18].toString());
toporow.setZ_PORT(line2[19].toString());
toporow.setZ_INTERFACE(line2[20].toString());
toporow.setZ_IF_DESC(line2[21].toString());
toporow.setA_CARD_NAME(line2[22].toString());
toporow.setZ_CARD_NAME(line2[23].toString());
toporow.setPHY_CIRCUIT_ID(line2[24].toString());
toporow.setLAG_CIRCUIT_ID(line2[25].toString());
toporow.setPHY_CIRCUIT_ALIAS(line2[26].toString());
toporow.setA_VENDOR(line2[27].toString());
toporow.setA_MODEL(line2[28].toString());
toporow.setA_TECHNOLOGY(line2[29].toString());
toporow.setZ_VENDOR(line2[30].toString());
toporow.setZ_MODEL(line2[31].toString());
toporow.setZ_TECHNOLOGY(line2[32].toString());
toporow.setA_EH_ELEMENT_ID(line2[33].toString());
toporow.setA_EH_MACHINE_ID(line2[34].toString());
toporow.setZ_EH_ELEMENT_ID(line2[35].toString());
toporow.setZ_EH_MACHINE_ID(line2[36].toString());
toporow.setA_EH_SPEED(line2[37].toString());
toporow.setZ_EH_SPEED(line2[38].toString());
toporow.setA_EH_SPEED1(line2[39].toString());
toporow.setZ_EH_SPEED1(line2[40].toString());
toporow.setA_EH_EHEALTH_DOMAIN(line2[41].toString());
toporow.setZ_EH_EHEALTH_DOMAIN(line2[42].toString());
toporow.setA_MRTG_HOSTID(line2[43].toString());
toporow.setA_MRTG_GRPID(line2[44].toString());
toporow.setA_MRTG_IFID(line2[45].toString());
toporow.setZ_MRTG_HOSTID(line2[46].toString());
toporow.setZ_MRTG_GRPID(line2[47].toString());
toporow.setZ_MRTG_IFID(line2[48].toString());
toporow.setA_MGMT_IP(line2[49].toString());
toporow.setZ_MGMT_IP(line2[50].toString());
toporow.setA_IF_INDEX(line2[51].toString());
toporow.setZ_IF_INDEX(line2[52].toString());
toporow.setIS_PROD(line2[53].toString());
toporow.setTOPOLOGY_KEY(line2[54].toString());
toporow.setCOMMIT_TS(line2[55].toString());
return toporow;
}
});
System.out.println("MODEL_TOPOLOGYRDD COUNT: " + MODEL_TOPOLOGYRDD.count());
堆栈跟踪:
Application Failed...org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at com.verizon.npi.MainApp.call(MainApp.java:103)
at com.verizon.npi.MainApp.call(MainApp.java:96)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我能看到这段代码的唯一问题是是否有任何 line2[i]
为空。
并且 if(line2[i].toString() == null)
将不起作用,因为 toString()
永远不会 returns null
用于 Object
class。
我看到您希望任何 null 值都变成空字符串,所以尝试将此作为您的 for 循环
for(int i = 0; i < line2.length; i++){
line2[i] = (line2[i] == null) ? "" : String.valueOf(line2[i]);
}
注意 String.valueOf
的使用,它在尝试获取对象的 String 值时完全避免了 NullPointerException。