使用 Jackson 在 Java 中定义的 POJO 将 JSON 引入 Spark RDD

Ingesting JSON into Spark RDD's with POJOs defined by Jackson in Java

我正在尝试在 Java 中的 Jackson 库的帮助下将 JSON 文件从本地提取到 Spark RDD。 Schema 在 POJOs(Plain Old Java Objects) 中定义。但是我无法将数据加载到数据集或 JavaRDD.

我已经尝试了下面的代码,它打印了模式但是打印数据显示异常。

    public static void main(String[] args) {

    //SparkConf conf = new SparkConf().setMaster("spark://master:7077");
    SparkConf conf = new SparkConf().setMaster("local");
    SparkSession session = SparkSession.builder().config(conf).getOrCreate();
    System.out.println("Session Created");

    // Read the sellerAddress csv file, and create an RDD from it
    //JavaRDD<Row> sellerAddress = session.read().csv("/home/jubin/Documents/sellerAddress.csv").javaRDD();
    ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    RDD<String> blockData = session.sparkContext().textFile("/home/jubin/Documents/558098.json",0);
    JavaRDD<BlockArray> blockParser = blockData.toJavaRDD().map(v -> objectMapper.readValue(v, BlockArray.class));
    //blockParser.foreach(t -> System.out.println(t));

Session Created
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.IllegalArgumentException: object is not an instance of declaring class
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$$anonfun$apply.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$$anonfun$apply.apply(SQLContext.scala:1108)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows.apply(SQLContext.scala:1106)
    at scala.collection.Iterator$$anon.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
    at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2830)
    at org.apache.spark.sql.Dataset$$anonfun$count.apply(Dataset.scala:2829)
    at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
    at blockchain.spark.AddressTracker.main(AddressTracker.java:41)
Caused by: java.lang.IllegalArgumentException: object is not an instance of declaring class
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$$anonfun$apply.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows$$anonfun$apply.apply(SQLContext.scala:1108)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows.apply(SQLContext.scala:1108)
    at org.apache.spark.sql.SQLContext$$anonfun$beansToRows.apply(SQLContext.scala:1106)
    at scala.collection.Iterator$$anon.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

使用 lift-json 库帮助了我而不是 jackson。 否则 SparkSQL 也可以用于调用而不是 POJOs。