如何使用 Spark 将微秒的日期时间写入 Cassandra?

How to write datetime with microseconds to Cassandra with Spark?

我想将特定日期格式流式传输到 Cassandra datetime 列中。

我的传入日期格式是以下日期格式:

"%Y-%m-%dT%H:%M:%S.%f"

e.g. "2021-05-18T11:12:13.123456"

我的 Cassandra table 是:

CREATE TABLE table_name (

  id text,
  timestamp timestamp,
  PRIMARY KEY (id)

)

我的 Spark 作业的摄取如下:

val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkahost)
    .option("subscribe", kafkatopic)
    .option("startingOffsets", "latest")
    .load()

val jsonSchema = StructType(
      List(
        StructField("id", StringType, true),
        StructField("timestamp", StringType, true)
      )
    )

val cassandraDF = df
    .withColumn("value", col("value").cast(StringType)),
    .withColumn("json", from_json(col("value"), jsonSchema))
    .select("json.*")

此时cassandraDF有两列idtimestamp,都是StringType.

那么,写入Cassandra也很简单:

cassandraDF
    .writeStream
    .outputMode(OutputMode.Append)
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace", cassandra_keyspace)
    .option("table", cassandra_table)
    .option("checkpointLocation", "/tmp/checkpoints/cassandra")
    .start()
    .awaitTermination()

当我传输数据时,我得到以下 ERROR:

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)

at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:193)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.IllegalArgumentException: Invalid date: 2021-05-18T11:12:13.123456

所以我尝试流式传输其他日期格式。例如没有微秒的:

"%Y-%m-%dT%H:%M:%S"

e.g. "2021-05-18T11:12:13"

并且流式传输 没有任何问题。此日期日期格式被接受。

我的问题是:

Cassandra 仅支持时间戳类型的毫秒分辨率。默认情况下不允许将字符串写入时间戳字段,但 Spark Connector 具有隐式转换,如 this. And if you look into this source code,你会看到它只支持从时间戳以毫秒为单位进行解析。

所以解决方案是将您的 timestamp 列从 string 转换为 Spark timestamp(仅在 Spark 3.x where support for microseconds resolution was added 上):

val cassandraDF = df
    .withColumn("value", col("value").cast(StringType)),
    .withColumn("json", from_json(col("value"), jsonSchema))
    .select("json.*")
    .withColumn("timestamp"), $"timestamp".cast("timestamp")

但无论如何,它将仅以毫秒存储在 Cassandra 中。