将 cassandra 时间戳列转换为 timeuuid

Casting cassandra timestamp column as timeuuid

我正在从 Kafka 获取事件并存储到 Cassandra。解析包含字段 eventID, sessionID, timestamp, userIDjson 以创建 Cassandra table 的列,如下所示:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

在代码中:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

我需要将 timestamp 列添加为 timeuuid。因为我是从 json 解析的,所以从 header 中提取所有值并以这种方式创建列:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

这部分:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

正在生成错误

java.lang.NumberFormatException: For input string: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

甚至尝试过: java.util.UUID.fromString(x._1("timestamp").toString, 也产生相同的错误。 如何正确 cast/convert timestamp 作为 timeuuid 并通过 spark job

插入 Cassandra

您有一个不是数字的字符串,您正在尝试使用 toLong 将其转换为数字。因此例外。

查看 this,您似乎可以使用此方法根据某个时间戳获取 UUID:

public static UUID getTimeUUID(long when)

您必须将字符串解析为 DateTimeInstant,然后将 DateTime/ Instant 的毫秒数传递给 getTimeUUID

我设法做到了,将 timestamp 格式转换为 dateTimemillis,然后生成 uuid

val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)

val allJson = rdd.
              map(x => {
                implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                //use serialization default to format a Map to JSON
                (x, Serialization.write(x))
              }).
              filter(x => x._1 isDefinedAt "header").
              map(x => (x._1("header"), x._2)).
              filter(x => (x._1 isDefinedAt "userID") &&
                (x._1 isDefinedAt "eventID") &&
                (x._1 isDefinedAt "sessionID") &&
                (x._1 isDefinedAt "timestamp").
              map(x => {
                var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                try {
                  val dateStr: String = x._1("timestamp").asInstanceOf[String]
                  // timestamp from event json
                  // create DateTime from Timestamp string
                  val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                  // create millis from DateTime
                  millis = dateTime.toInstant.toEpochMilli
                } catch {
                  case e: Exception =>
                    e.printStackTrace()
                }
                // generate timeuuid
                val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                // generate eventDate
                val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                cassFormat(x._1("eventID").toString,
                  x._1("sessionID").toString,
                  uuid,
                  x._1("userID").toString,
                  eventDate,
                  x._2)
              })
            allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
        }
      })

timestamp cassandra 中的列现在看起来像:58976340-7313-11e9-910d-60dce7513b94

我已经用 UDF 解决了这个问题。

import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
 
val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
 
val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)