将 RDD 转换为 DataFrame Spark Streaming 时出现 ClassCastException
ClassCastException at convert RDD to DataFrame Spark Streaming
大家好,我有下一个问题。我使用带有 Java 的 Apache Spark Streaming v1.6.0 从 IBM MQ 获取一些消息。我为 MQ 制作了自定义接收器,但我遇到的问题是我需要将 RDD 从 JavaDStream 转换为 DataFrame。为此,我用 foreachRDD 迭代 JavaDStream,并为 DataFrame 定义了模式,但是当我 运行 作业时,第一条消息抛出下一个异常:
java.lang.ClassCastException: org.apache.spark.rdd.BlockRDDPartition 无法转换为 org.apache.spark.rdd.ParallelCollectionPartition
在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在 org.apache.spark.scheduler.Task.run(Task.scala:89)
在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
在 java.lang.Thread.run(Thread.java:748)
28 年 3 月 19 日 12:53:26 警告 TaskSetManager:阶段 0.0 中丢失任务 0.0(TID 0,本地主机):java.lang.ClassCastException:org.apache.spark.rdd.BlockRDDPartition 无法转换为 org.apache.spark.rdd.ParallelCollectionPartition
在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102)
在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
在 org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在 org.apache.spark.scheduler.Task.run(Task.scala:89)
在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
在 java.lang.Thread.run(Thread.java:748)
然后代码执行的很好。即使我在 MQ 中没有任何消息,这只是我 运行 工作时的第一条消息。
这是我的 CustomMQReceiver
public CustomMQReceiver() {
super(StorageLevel.MEMORY_ONLY_2());
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
try {
initConnection();
receive();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}.start();
}
@Override
public void onStop() {
}
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
Message receivedMessage = null;
while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null) {
String userInput = convertStreamToString(receivedMessage);
System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
} catch (Exception e) {
e.printStackTrace();
restart("Trying to connect again");
} catch (Throwable t) {
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException {
MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
conFactory.setHostName(HOST);
conFactory.setPort(PORT);
conFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
conFactory.setQueueManager(QMGR);
conFactory.setChannel(CHANNEL);
conFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
conFactory.setStringProperty(WMQConstants.USERID, APP_USER);
conFactory.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
qCon = (MQQueueConnection) conFactory.createConnection();
MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue = (MQQueue) qSession.createQueue(QUEUE_NAME);
consumer = (MQMessageConsumer) qSession.createConsumer(queue);
qCon.start();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
private static String convertStreamToString(final Message jmsMsg) throws Exception {
String stringMessage = "";
JMSTextMessage msg = (JMSTextMessage) jmsMsg;
stringMessage = msg.getText();
return stringMessage;
}
这是我的 spark 代码
SparkConf sparkConf = new SparkConf()
.setAppName("MQStreaming")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
final SQLContext sqlContext = new SQLContext(jsc);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(propertiesConf.getProperty("duration"))));
JavaDStream<String> customReceiverStream = ssc.receiverStream(new CustomMQReceiver());
customReceiverStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
JavaRDD<Row> rddRow = rdd.map(new Function<String, Row>() {
@Override
public Row call(String v1) throws Exception {
return RowFactory.create(v1);
}
});
try {
StructType schema = new StructType(new StructField[]{
new StructField("trama", DataTypes.StringType, true, Metadata.empty())
});
DataFrame frame = sqlContext.createDataFrame(rddRow, schema);
if (frame.count() > 0) {
//Here is where the first messages throw the exception
frame.show();
frame.write().mode(SaveMode.Append).json("file:///C:/tmp/");
}
} catch (Exception ex) {
System.out.println(" INFO " + ex.getMessage());
}
}
});
ssc.start();
ssc.awaitTermination();
我无法更改 spark 的版本,因为此作业将 运行 在具有 spark 1.6 的旧 cloudera 集群中。我不知道我是做错了什么还是只是一个错误。求助!!!
我解决了我自己的问题,这个异常是我创建 SQLContext 的方式抛出的,正确的方法是用 JavaStreamingContext 创建 sqlContext
//JavaStreamingContext jsc = ...
SQLContext sqlContext = new SQLContext(jsc.sparkContext());
大家好,我有下一个问题。我使用带有 Java 的 Apache Spark Streaming v1.6.0 从 IBM MQ 获取一些消息。我为 MQ 制作了自定义接收器,但我遇到的问题是我需要将 RDD 从 JavaDStream 转换为 DataFrame。为此,我用 foreachRDD 迭代 JavaDStream,并为 DataFrame 定义了模式,但是当我 运行 作业时,第一条消息抛出下一个异常:
java.lang.ClassCastException: org.apache.spark.rdd.BlockRDDPartition 无法转换为 org.apache.spark.rdd.ParallelCollectionPartition 在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 28 年 3 月 19 日 12:53:26 警告 TaskSetManager:阶段 0.0 中丢失任务 0.0(TID 0,本地主机):java.lang.ClassCastException:org.apache.spark.rdd.BlockRDDPartition 无法转换为 org.apache.spark.rdd.ParallelCollectionPartition 在 org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 在 org.apache.spark.scheduler.Task.run(Task.scala:89) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)
然后代码执行的很好。即使我在 MQ 中没有任何消息,这只是我 运行 工作时的第一条消息。
这是我的 CustomMQReceiver
public CustomMQReceiver() {
super(StorageLevel.MEMORY_ONLY_2());
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
try {
initConnection();
receive();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}.start();
}
@Override
public void onStop() {
}
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
Message receivedMessage = null;
while (!isStopped() && (receivedMessage = consumer.receiveNoWait()) != null) {
String userInput = convertStreamToString(receivedMessage);
System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
} catch (Exception e) {
e.printStackTrace();
restart("Trying to connect again");
} catch (Throwable t) {
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException {
MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
conFactory.setHostName(HOST);
conFactory.setPort(PORT);
conFactory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
conFactory.setQueueManager(QMGR);
conFactory.setChannel(CHANNEL);
conFactory.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
conFactory.setStringProperty(WMQConstants.USERID, APP_USER);
conFactory.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
qCon = (MQQueueConnection) conFactory.createConnection();
MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue = (MQQueue) qSession.createQueue(QUEUE_NAME);
consumer = (MQMessageConsumer) qSession.createConsumer(queue);
qCon.start();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
private static String convertStreamToString(final Message jmsMsg) throws Exception {
String stringMessage = "";
JMSTextMessage msg = (JMSTextMessage) jmsMsg;
stringMessage = msg.getText();
return stringMessage;
}
这是我的 spark 代码
SparkConf sparkConf = new SparkConf()
.setAppName("MQStreaming")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
final SQLContext sqlContext = new SQLContext(jsc);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(propertiesConf.getProperty("duration"))));
JavaDStream<String> customReceiverStream = ssc.receiverStream(new CustomMQReceiver());
customReceiverStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) throws Exception {
JavaRDD<Row> rddRow = rdd.map(new Function<String, Row>() {
@Override
public Row call(String v1) throws Exception {
return RowFactory.create(v1);
}
});
try {
StructType schema = new StructType(new StructField[]{
new StructField("trama", DataTypes.StringType, true, Metadata.empty())
});
DataFrame frame = sqlContext.createDataFrame(rddRow, schema);
if (frame.count() > 0) {
//Here is where the first messages throw the exception
frame.show();
frame.write().mode(SaveMode.Append).json("file:///C:/tmp/");
}
} catch (Exception ex) {
System.out.println(" INFO " + ex.getMessage());
}
}
});
ssc.start();
ssc.awaitTermination();
我无法更改 spark 的版本,因为此作业将 运行 在具有 spark 1.6 的旧 cloudera 集群中。我不知道我是做错了什么还是只是一个错误。求助!!!
我解决了我自己的问题,这个异常是我创建 SQLContext 的方式抛出的,正确的方法是用 JavaStreamingContext 创建 sqlContext
//JavaStreamingContext jsc = ...
SQLContext sqlContext = new SQLContext(jsc.sparkContext());