Spark Streaming, foreachRDD error : Comparison method violates its general contract
Spark Streaming, foreachRDD error : Comparison method violates its general contract
我正在测试 Spark Streaming API。
该应用程序部署在带有 Spark 1.4.0 的 Amazon EMR 集群上
我正在 S3 中排序数据和保存文件。
流水线的代码(排序算法除外)详见如下:
public KinesisPreProcessPipeline(JavaStreamingContext jssc, final KinesisPreProcessModuleConfiguration moduleConfiguration) {
JavaReceiverInputDStream<byte[]> inputDStream = KinesisUtils.createStream(jssc, moduleConfiguration.getAppName(), moduleConfiguration.getStreamName(),
"kinesis." + moduleConfiguration.getRegion() + ".amazonaws.com", moduleConfiguration.getRegion(), InitialPositionInStream.LATEST,
Durations.seconds(5), StorageLevel.MEMORY_AND_DISK_SER());
JavaDStream<StreamingMessage> messageJavaDStream = inputDStream.map(new Function<byte[], StreamingMessage>() {
@Override
public StreamingMessage call(byte[] bytes) throws Exception {
return jsonParser.fromJson(new String(bytes), StreamingMessage.class);
}
});
final String destinationFolder = moduleConfiguration.getDestinationFolder();
StreamingPreProcessPipeline pipeline = new StreamingPreProcessPipeline().withInputDStream(messageJavaDStream)
.withPreProcessStep(new SortPreProcess());
JavaDStream<StreamingMessage> output = pipeline.execute();
output.checkpoint(Durations.seconds(moduleConfiguration.getBatchInterval() * 2));
JavaDStream<String> messagesAsJson = output.map(new Function<StreamingMessage, String>() {
@Override
public String call(StreamingMessage message) throws Exception {
return jsonParser.toJson(message);
}
});
messagesAsJson.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.saveAsTextFile(destinationFolder + "/" + dateFormat.print(new DateTime()) + "-" + rdd.id());
return null;
}
});
}
当应用程序 运行 在集群上时,它很快失败并出现以下错误。
15/07/17 13:17:36 ERROR executor.Executor: Exception in task 0.1 in stage 8.0 (TID 90)
java.lang.IllegalArgumentException: Comparison method violates its general contract!
at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:776)
at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:507)
at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:435)
at org.apache.spark.util.collection.TimSort$SortState.access0(TimSort.java:307)
at org.apache.spark.util.collection.TimSort.sort(TimSort.java:135)
at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37)
at org.apache.spark.util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator(PartitionedPairBuffer.scala:70)
at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:690)
at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:708)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745
错误发生在 foreachRDD 步骤,但我仍在寻找它失败的原因...
用于排序的 class 在 compareTo 实现中有一个错误。
Comparable 的 javadoc 建议以与 equals() 一致的方式实现 compareTo。修复此错误后,spark 作业按预期工作。
我正在测试 Spark Streaming API。 该应用程序部署在带有 Spark 1.4.0 的 Amazon EMR 集群上 我正在 S3 中排序数据和保存文件。
流水线的代码(排序算法除外)详见如下:
public KinesisPreProcessPipeline(JavaStreamingContext jssc, final KinesisPreProcessModuleConfiguration moduleConfiguration) {
JavaReceiverInputDStream<byte[]> inputDStream = KinesisUtils.createStream(jssc, moduleConfiguration.getAppName(), moduleConfiguration.getStreamName(),
"kinesis." + moduleConfiguration.getRegion() + ".amazonaws.com", moduleConfiguration.getRegion(), InitialPositionInStream.LATEST,
Durations.seconds(5), StorageLevel.MEMORY_AND_DISK_SER());
JavaDStream<StreamingMessage> messageJavaDStream = inputDStream.map(new Function<byte[], StreamingMessage>() {
@Override
public StreamingMessage call(byte[] bytes) throws Exception {
return jsonParser.fromJson(new String(bytes), StreamingMessage.class);
}
});
final String destinationFolder = moduleConfiguration.getDestinationFolder();
StreamingPreProcessPipeline pipeline = new StreamingPreProcessPipeline().withInputDStream(messageJavaDStream)
.withPreProcessStep(new SortPreProcess());
JavaDStream<StreamingMessage> output = pipeline.execute();
output.checkpoint(Durations.seconds(moduleConfiguration.getBatchInterval() * 2));
JavaDStream<String> messagesAsJson = output.map(new Function<StreamingMessage, String>() {
@Override
public String call(StreamingMessage message) throws Exception {
return jsonParser.toJson(message);
}
});
messagesAsJson.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
rdd.saveAsTextFile(destinationFolder + "/" + dateFormat.print(new DateTime()) + "-" + rdd.id());
return null;
}
});
}
当应用程序 运行 在集群上时,它很快失败并出现以下错误。
15/07/17 13:17:36 ERROR executor.Executor: Exception in task 0.1 in stage 8.0 (TID 90) java.lang.IllegalArgumentException: Comparison method violates its general contract! at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:776) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:507) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:435) at org.apache.spark.util.collection.TimSort$SortState.access0(TimSort.java:307) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:135) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.util.collection.PartitionedPairBuffer.partitionedDestructiveSortedIterator(PartitionedPairBuffer.scala:70) at org.apache.spark.util.collection.ExternalSorter.partitionedIterator(ExternalSorter.scala:690) at org.apache.spark.util.collection.ExternalSorter.iterator(ExternalSorter.scala:708) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:64) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745
错误发生在 foreachRDD 步骤,但我仍在寻找它失败的原因...
用于排序的 class 在 compareTo 实现中有一个错误。 Comparable 的 javadoc 建议以与 equals() 一致的方式实现 compareTo。修复此错误后,spark 作业按预期工作。