如何在 Spark 的 Cassandra 查询中使用 java.time.LocalDate?
How to use java.time.LocalDate in Cassandra query from Spark?
我们在 Cassandra 中有一个 table,列 start_time
类型为 date
。
当我们执行以下代码时:
val resultRDD = inputRDD.joinWithCassandraTable(KEY_SPACE,TABLE)
.where("start_time = ?", java.time.LocalDate.now)
我们收到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object 2016-10-13 of type class java.time.LocalDate to com.datastax.driver.core.LocalDate.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:45)
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$$anonfun$convertPF.applyOrElse(TypeConverter.scala:449)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:439)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$.convert(TypeConverter.scala:439)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF.applyOrElse(TypeConverter.scala:788)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:771)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:771)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun.apply(BoundStatementBuilder.scala:93)
我尝试根据 documentation:
注册自定义转换器
object JavaLocalDateToCassandraLocalDateConverter extends TypeConverter[com.datastax.driver.core.LocalDate] {
def targetTypeTag = typeTag[com.datastax.driver.core.LocalDate]
def convertPF = {
case ld: java.time.LocalDate => com.datastax.driver.core.LocalDate.fromYearMonthDay(ld.getYear, ld.getMonthValue, ld.getDayOfMonth)
case _ => com.datastax.driver.core.LocalDate.fromYearMonthDay(1971, 1, 1)
}
}
object CassandraLocalDateToJavaLocalDateConverter extends TypeConverter[java.time.LocalDate] {
def targetTypeTag = typeTag[java.time.LocalDate]
def convertPF = { case ld: com.datastax.driver.core.LocalDate => java.time.LocalDate.of(ld.getYear(), ld.getMonth(), ld.getDay())
case _ => java.time.LocalDate.now
}
}
TypeConverter.registerConverter(JavaLocalDateToCassandraLocalDateConverter)
TypeConverter.registerConverter(CassandraLocalDateToJavaLocalDateConverter)
但是没有用。
如何在从 Spark 执行的 Cassandra 查询中使用 JDK8 Date/Time 类?
Cassandra 日期格式为 yyyy-MM-dd HH:mm:ss.SSS
所以你可以使用下面的代码,如果你使用 Java 8 将 Cassandra 日期转换为 LocalDate
,那么你可以按照你的逻辑去做。
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val dateTime = LocalDateTime.parse(cassandraDateTime, formatter);
或者您可以将 LocalDate 转换为 Cassandra 日期格式并进行检查。
我认为在像这样的 where 子句中最简单的事情就是调用
sc
.cassandraTable("test","test")
.where("start_time = ?", java.time.LocalDate.now.toString)
.collect`
只需传入字符串,因为这将是一个定义明确的转换。
TypeConverter 中似乎存在问题,您的转换器没有优先于内置转换器。我会快速看一下。
--编辑--
注册的转换器似乎没有正确传输到执行器。在本地模式下,代码按预期工作,这让我认为这是一个序列化问题。对于这个问题,我会在 Spark Cassandra Connector 上开一张票。
我们在 Cassandra 中有一个 table,列 start_time
类型为 date
。
当我们执行以下代码时:
val resultRDD = inputRDD.joinWithCassandraTable(KEY_SPACE,TABLE)
.where("start_time = ?", java.time.LocalDate.now)
我们收到以下错误:
com.datastax.spark.connector.types.TypeConversionException: Cannot convert object 2016-10-13 of type class java.time.LocalDate to com.datastax.driver.core.LocalDate.
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:45)
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert.apply(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$$anonfun$convertPF.applyOrElse(TypeConverter.scala:449)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:439)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$LocalDateConverter$.convert(TypeConverter.scala:439)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF.applyOrElse(TypeConverter.scala:788)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:771)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:56)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:771)
at com.datastax.spark.connector.writer.BoundStatementBuilder$$anonfun.apply(BoundStatementBuilder.scala:93)
我尝试根据 documentation:
注册自定义转换器object JavaLocalDateToCassandraLocalDateConverter extends TypeConverter[com.datastax.driver.core.LocalDate] {
def targetTypeTag = typeTag[com.datastax.driver.core.LocalDate]
def convertPF = {
case ld: java.time.LocalDate => com.datastax.driver.core.LocalDate.fromYearMonthDay(ld.getYear, ld.getMonthValue, ld.getDayOfMonth)
case _ => com.datastax.driver.core.LocalDate.fromYearMonthDay(1971, 1, 1)
}
}
object CassandraLocalDateToJavaLocalDateConverter extends TypeConverter[java.time.LocalDate] {
def targetTypeTag = typeTag[java.time.LocalDate]
def convertPF = { case ld: com.datastax.driver.core.LocalDate => java.time.LocalDate.of(ld.getYear(), ld.getMonth(), ld.getDay())
case _ => java.time.LocalDate.now
}
}
TypeConverter.registerConverter(JavaLocalDateToCassandraLocalDateConverter)
TypeConverter.registerConverter(CassandraLocalDateToJavaLocalDateConverter)
但是没有用。
如何在从 Spark 执行的 Cassandra 查询中使用 JDK8 Date/Time 类?
Cassandra 日期格式为 yyyy-MM-dd HH:mm:ss.SSS
所以你可以使用下面的代码,如果你使用 Java 8 将 Cassandra 日期转换为 LocalDate
,那么你可以按照你的逻辑去做。
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
val dateTime = LocalDateTime.parse(cassandraDateTime, formatter);
或者您可以将 LocalDate 转换为 Cassandra 日期格式并进行检查。
我认为在像这样的 where 子句中最简单的事情就是调用
sc
.cassandraTable("test","test")
.where("start_time = ?", java.time.LocalDate.now.toString)
.collect`
只需传入字符串,因为这将是一个定义明确的转换。
TypeConverter 中似乎存在问题,您的转换器没有优先于内置转换器。我会快速看一下。
--编辑--
注册的转换器似乎没有正确传输到执行器。在本地模式下,代码按预期工作,这让我认为这是一个序列化问题。对于这个问题,我会在 Spark Cassandra Connector 上开一张票。