将 cassandra 时间戳列转换为 timeuuid
Casting cassandra timestamp column as timeuuid
我正在从 Kafka
获取事件并存储到 Cassandra
。解析包含字段 eventID, sessionID, timestamp, userID
的 json
以创建 Cassandra
table 的列,如下所示:
cassandra@cqlsh> CREATE TABLE mydata.events (
... "event_date" date,
... "eventID" text,
... "userID" text,
... timestamp timeuuid,
... "sessionID" text,
... "fullJson" text,
... PRIMARY KEY ("event_date", timestamp, "sessionID")
在代码中:
case class cassandraFormat(
eventID: String,
sessionID: String,
timeuuid: UUID, // timestamp as timeuuid
userID: String,
event_date: LocalDate, // YYYY-MM-dd format
fullJson: String // full json from Kafka
)
我需要将 timestamp
列添加为 timeuuid
。因为我是从 json
解析的,所以从 header 中提取所有值并以这种方式创建列:
val allJson = rdd.
map(x => {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
//use serialization default to format a Map to JSON
(x, Serialization.write(x))
}).
filter(x => x._1 isDefinedAt "header").
map(x => (x._1("header"), x._2)).
filter(x => (x._1 isDefinedAt "userID") &&
(x._1 isDefinedAt "eventID") &&
(x._1 isDefinedAt "sessionID") &&
(x._1 isDefinedAt "timestamp").
map(x => cassFormat(x._1("eventID").toString,
x._1("sessionID").toString,
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
x._1("userID").toString,
com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
x._2))
这部分:
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)
正在生成错误
java.lang.NumberFormatException: For input string:
"2019-05-09T09:00:52.553+0000" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
甚至尝试过:
java.util.UUID.fromString(x._1("timestamp").toString
,
也产生相同的错误。
如何正确 cast/convert timestamp
作为 timeuuid
并通过 spark job
插入 Cassandra
您有一个不是数字的字符串,您正在尝试使用 toLong
将其转换为数字。因此例外。
查看 this,您似乎可以使用此方法根据某个时间戳获取 UUID:
public static UUID getTimeUUID(long when)
您必须将字符串解析为 DateTime
或 Instant
,然后将 DateTime/ Instant 的毫秒数传递给 getTimeUUID
我设法做到了,将 timestamp
格式转换为 dateTime
和 millis
,然后生成 uuid
:
val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)
val allJson = rdd.
map(x => {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
//use serialization default to format a Map to JSON
(x, Serialization.write(x))
}).
filter(x => x._1 isDefinedAt "header").
map(x => (x._1("header"), x._2)).
filter(x => (x._1 isDefinedAt "userID") &&
(x._1 isDefinedAt "eventID") &&
(x._1 isDefinedAt "sessionID") &&
(x._1 isDefinedAt "timestamp").
map(x => {
var millis: Long = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
try {
val dateStr: String = x._1("timestamp").asInstanceOf[String]
// timestamp from event json
// create DateTime from Timestamp string
val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
// create millis from DateTime
millis = dateTime.toInstant.toEpochMilli
} catch {
case e: Exception =>
e.printStackTrace()
}
// generate timeuuid
val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
// generate eventDate
val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
cassFormat(x._1("eventID").toString,
x._1("sessionID").toString,
uuid,
x._1("userID").toString,
eventDate,
x._2)
})
allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
}
})
timestamp
cassandra 中的列现在看起来像:58976340-7313-11e9-910d-60dce7513b94
我已经用 UDF 解决了这个问题。
import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)
我正在从 Kafka
获取事件并存储到 Cassandra
。解析包含字段 eventID, sessionID, timestamp, userID
的 json
以创建 Cassandra
table 的列,如下所示:
cassandra@cqlsh> CREATE TABLE mydata.events (
... "event_date" date,
... "eventID" text,
... "userID" text,
... timestamp timeuuid,
... "sessionID" text,
... "fullJson" text,
... PRIMARY KEY ("event_date", timestamp, "sessionID")
在代码中:
case class cassandraFormat(
eventID: String,
sessionID: String,
timeuuid: UUID, // timestamp as timeuuid
userID: String,
event_date: LocalDate, // YYYY-MM-dd format
fullJson: String // full json from Kafka
)
我需要将 timestamp
列添加为 timeuuid
。因为我是从 json
解析的,所以从 header 中提取所有值并以这种方式创建列:
val allJson = rdd.
map(x => {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
//use serialization default to format a Map to JSON
(x, Serialization.write(x))
}).
filter(x => x._1 isDefinedAt "header").
map(x => (x._1("header"), x._2)).
filter(x => (x._1 isDefinedAt "userID") &&
(x._1 isDefinedAt "eventID") &&
(x._1 isDefinedAt "sessionID") &&
(x._1 isDefinedAt "timestamp").
map(x => cassFormat(x._1("eventID").toString,
x._1("sessionID").toString,
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
x._1("userID").toString,
com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
x._2))
这部分:
com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)
正在生成错误
java.lang.NumberFormatException: For input string: "2019-05-09T09:00:52.553+0000" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
甚至尝试过:
java.util.UUID.fromString(x._1("timestamp").toString
,
也产生相同的错误。
如何正确 cast/convert timestamp
作为 timeuuid
并通过 spark job
Cassandra
您有一个不是数字的字符串,您正在尝试使用 toLong
将其转换为数字。因此例外。
查看 this,您似乎可以使用此方法根据某个时间戳获取 UUID:
public static UUID getTimeUUID(long when)
您必须将字符串解析为 DateTime
或 Instant
,然后将 DateTime/ Instant 的毫秒数传递给 getTimeUUID
我设法做到了,将 timestamp
格式转换为 dateTime
和 millis
,然后生成 uuid
:
val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)
val allJson = rdd.
map(x => {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
//use serialization default to format a Map to JSON
(x, Serialization.write(x))
}).
filter(x => x._1 isDefinedAt "header").
map(x => (x._1("header"), x._2)).
filter(x => (x._1 isDefinedAt "userID") &&
(x._1 isDefinedAt "eventID") &&
(x._1 isDefinedAt "sessionID") &&
(x._1 isDefinedAt "timestamp").
map(x => {
var millis: Long = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
try {
val dateStr: String = x._1("timestamp").asInstanceOf[String]
// timestamp from event json
// create DateTime from Timestamp string
val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
// create millis from DateTime
millis = dateTime.toInstant.toEpochMilli
} catch {
case e: Exception =>
e.printStackTrace()
}
// generate timeuuid
val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
// generate eventDate
val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
cassFormat(x._1("eventID").toString,
x._1("sessionID").toString,
uuid,
x._1("userID").toString,
eventDate,
x._2)
})
allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
}
})
timestamp
cassandra 中的列现在看起来像:58976340-7313-11e9-910d-60dce7513b94
我已经用 UDF 解决了这个问题。
import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf
val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)