如何在批处理模式下使用 spark-cassandra 连接器加载 Collection 数据类型

How to load Collection data types using spark-cassandra connector in batch mode

我正在尝试将具有两个属性和集合数据类型的 spark 数据帧加载到 Cassandra table。

在传入的提要文件中,这些属性是 text/String。我使用下面的代码将String类型分别转换为List和Map类型:

    spark.udf.register("getLst", (input: String) => input.split(",").toList)

    spark.udf.register("getMap", (input:String) => parse(input).values.asInstanceOf[Map[String, String]])

    val ofr_data_final=spark.sql("""select  
            ...
            getLst(acct_nb_ls) as acct_nb_ls, 
            getMap(brw_eci_and_sts_mp) as brw_eci_and_sts_mp, 
            .....""")

spark 数据框的打印模式显示了这两个属性,如下所示:

    |-- acct_nb_ls: array (nullable = true)
    |    |-- element: string (containsNull = true)
    |-- brw_eci_and_sts_mp: map (nullable = true)
    |    |-- key: string
    |    |-- value: string (valueContainsNull = true)

在 Cassandra 中,这两个属性的定义如下所示:

    acct_nb_ls FROZEN<LIST<text>>,
    brw_eci_and_sts_mp FROZEN<MAP<text, text>>,

这是我的加载语句:

    ofr_data_final.rdd.saveToCassandra(Config.keySpace,offerTable, writeConf = WriteConf(ttl = TTLOption.perRow("ttl")))

但是加载失败并出现以下错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 140 in stage 24.0 failed 4 times, most recent failure: Lost task 140.3 in stage 24.0 (TID 1741, bdtcstr70n12.svr.us.jpmchase.net, executor 9): java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
    at com.jpmc.mars.LoadOfferData$.delayedEndpoint$com$jpmc$mars$LoadOfferData(LoadOfferData.scala:246)
    at com.jpmc.mars.LoadOfferData$delayedInit$body.apply(LoadOfferData.scala:22)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.App$$anonfun$main.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.jpmc.mars.LoadOfferData$.main(LoadOfferData.scala:22)
    at com.jpmc.mars.LoadOfferData.main(LoadOfferData.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to write statements to mars_offerdetails.offer_detail_2.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:167)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我怀疑这个问题可能是因为属性 acct_nb_lst 被推断为 'array' 而不是 'list' 但我不确定如何让 spark 将其推断为 'list' 而不是 'array'。在我的 UDF 中,我定义了 mentioned

    input.split(",").toList

但它仍然被推断为数组。

在批处理模式下使用 spark-cassandra 连接器加载集合数据类型按预期工作,使用 rdd.saveToCassandra 在记录级别使用 ttl 选项。问题出在数据上。数据是旧的并且已经过期,生成负的 ttl 值,因此加载失败。

应增强 Spark 错误消息以暗示这一点。