ZonedDateTime 的 Flink 序列化
Flink Serialization of ZonedDateTime
我必须处理时区和纳秒时间分辨率。因此我使用 ZonedDateTime。显然 Apache Flink 没有正确序列化 ZonedDateTime。它确实按预期序列化了 LocalDateTime 部分,但是,它忘记了处理时区。
例如,当我在 Flink 流映射函数中记录分区日期时,我总是得到类似
的信息
2018-03-01T04:10:30.773471918null
而在数据开始时我得到了正确的区域
2018-03-01T04:10:30.773471918-05:00
null 指的是区域。后来当然我得到一个空指针异常,因为我必须使用适当的时间比较,这需要区域。
我怎样才能最简单地解决这个问题?谢谢回复。
我不完全理解为什么它不选择序列化程序。这个解决方案至少有效:我为 ZonedDateTime
实现了一个 Kryo 序列化器
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime
class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
setImmutable(true)
override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
ZonedDateTimeSerializer.write(out, obj)
}
override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
ZonedDateTimeSerializer.read(in)
}
}
object ZonedDateTimeSerializer {
def write(out: Output, obj: ZonedDateTime): Unit = {
LocalDateSerializer.write(out, obj.toLocalDate)
LocalTimeSerializer.write(out, obj.toLocalTime)
ZoneIdSerializer.write(out, obj.getZone)
}
def read(in: Input): ZonedDateTime = {
val date = LocalDateSerializer.read(in)
val time = LocalTimeSerializer.read(in)
val zone = ZoneIdSerializer.read(in)
ZonedDateTime(date, time, zone)
}
}
我从最新的实现中获取了实现 Kyro。
然后我注册如下:
env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])
这似乎解决了问题。不确定它是否来自我使用 timesforscala 的事实,但我想使用这个库,因为它添加了我依赖的重要补充。欢迎评论。
我必须处理时区和纳秒时间分辨率。因此我使用 ZonedDateTime。显然 Apache Flink 没有正确序列化 ZonedDateTime。它确实按预期序列化了 LocalDateTime 部分,但是,它忘记了处理时区。
例如,当我在 Flink 流映射函数中记录分区日期时,我总是得到类似
的信息 2018-03-01T04:10:30.773471918null
而在数据开始时我得到了正确的区域
2018-03-01T04:10:30.773471918-05:00
null 指的是区域。后来当然我得到一个空指针异常,因为我必须使用适当的时间比较,这需要区域。
我怎样才能最简单地解决这个问题?谢谢回复。
我不完全理解为什么它不选择序列化程序。这个解决方案至少有效:我为 ZonedDateTime
实现了一个 Kryo 序列化器import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime
class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
setImmutable(true)
override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
ZonedDateTimeSerializer.write(out, obj)
}
override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
ZonedDateTimeSerializer.read(in)
}
}
object ZonedDateTimeSerializer {
def write(out: Output, obj: ZonedDateTime): Unit = {
LocalDateSerializer.write(out, obj.toLocalDate)
LocalTimeSerializer.write(out, obj.toLocalTime)
ZoneIdSerializer.write(out, obj.getZone)
}
def read(in: Input): ZonedDateTime = {
val date = LocalDateSerializer.read(in)
val time = LocalTimeSerializer.read(in)
val zone = ZoneIdSerializer.read(in)
ZonedDateTime(date, time, zone)
}
}
我从最新的实现中获取了实现 Kyro。 然后我注册如下:
env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])
这似乎解决了问题。不确定它是否来自我使用 timesforscala 的事实,但我想使用这个库,因为它添加了我依赖的重要补充。欢迎评论。