火花作业(scala)将类型日期写入Cassandra

spark job (scala) write type Date to Cassandra

我正在使用 DSE 5.1(spark 2.0.2.6 和 cassandra 3.10.0.1652)

我的卡桑德拉table:

CREATE TABLE ks.tbl (
   dk int,
   date date,
   ck int,
   val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);

具有以下数据:

 dk | date       | ck | val
----+------------+----+-----
  1 | 2017-01-01 |  1 | 100
  1 | 2017-01-01 |  2 | 200

我的代码必须读取此数据并写入相同的内容,但使用昨天的日期(编译成功):

package com.datastax.spark.example

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._

object test extends App {

  val conf = new SparkConf().setAppName("DSE calculus app TEST")
  val sc = new SparkContext(conf)

  val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))

  val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")

  tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

  sc.stop()
  sys.exit(0)
}

当我运行这个应用程序时:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar

无法正确写入 Cassandra。日期变量似乎没有正确插入地图中。 我得到的错误是:

Error:
WARN  2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:207)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal.apply(TableWriter.scala:175)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

但是,当我如下直接在map语句中插入日期(字符串)时,代码确实正确插入了数据:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

如果我将昨天设置为整数(自纪元以来的天数),它也会正确插入数据。这将是最优的,但无法让 'yesterday' 以这种方式运行

编辑:实际上,这并没有正确插入数据。无论我将 'yesterday' 设置为 1 还是 100,000,000,它总是插入纪元 ('1970-01-01)

失败的代码在 DSE Spark 控制台中表现正确,正如我所期望的那样。

我只是不知道我做错了什么。欢迎任何帮助。

EDIT2:excecutor 0 stderr log 确实表明它试图在日期列中插入 Null 值,这显然是不可能的,因为它是一个聚类列。

在为 Spark 作业编写代码时,重要的是要意识到特定变量的设置时间和序列化时间。让我们看一下 App trait 文档

中的注释

Caveats

It should be noted that this trait is implemented using the DelayedInit functionality, which means that fields of the object will not have been initialized before the main method has been executed.

这意味着当代码实际 运行 时,对 App 主体中使用的变量的引用可能未在执行器上初始化。

我的猜测是您编写的 lambda 包含对在应用程序的延迟初始化部分初始化的 val 的引用 class。这意味着执行程序上代码的序列化版本 运行 Main 方法获取值的未初始​​化版本(空)。

将常量切换为 lazy val(或将其移动到单独的对象或 class)将通过确保远程初始化值(惰性 val)或简单地序列化初始化来解决此问题(单独 class/object)。

我想我知道你的问题是什么。
您可能会看到完整的日志文件。您只需附上其中的一部分...
今天有类似的错误,when create keyspace with replication_factor: 3 when I had only one cassandra instance.

所以我改变了它,问题就消失了。

ALTER KEYSPACE "some_keyspace_name" WITH REPLICATION =
  { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

Here is my error.log file

以及日志的重要部分:

Logging.scala[logError]:72) - Failed to execute: com.datastax.spark.connector.writer.RichBoundStatement@4746499f
com.datastax.driver.core.exceptions.UnavailableException: Not enough replicas available for query at consistency LOCAL_QUORUM (2 required but only 1 alive)