Avro 写入 java.sql.Timestamp 转换错误
Avro write java.sql.Timestamp conversion error
我需要将时间戳写入 Kafka 分区,然后从中读取。我已经为此定义了一个 Avro 模式:
{ "namespace":"sample",
"type":"record",
"name":"TestData",
"fields":[
{"name": "update_database_time", "type": "long", "logicalType": "timestamp-millis"}
]
}
但是,我在 producer.send 行中收到转换错误:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
我该如何解决这个问题?
下面是将时间戳写入Kafka的代码:
val tmstpOffset = testDataDF
.select("update_database_time")
.orderBy(desc("update_database_time"))
.head()
.getTimestamp(0)
val avroRecord = new GenericData.Record(parseAvroSchemaFromFile("/avro-offset-schema.json"))
avroRecord.put("update_database_time", tmstpOffset)
val producer = new KafkaProducer[String, GenericRecord](kafkaParams().asJava)
val data = new ProducerRecord[String, GenericRecord]("app_state_test7", avroRecord)
producer.send(data)
Avro 不直接支持timestamp 的时间,但逻辑上支持long。因此,您可以将其转换为 long 并按如下方式使用。 unix_timestamp() 函数用于转换,但如果您有特定的日期格式,请使用 unix_timestamp(col, dataformat) 重载函数。
import org.apache.spark.sql.functions._
val tmstpOffset = testDataDF
.select((unix_timestamp("update_database_time")*1000).as("update_database_time"))
.orderBy(desc("update_database_time"))
.head()
.getTimestamp(0)
我需要将时间戳写入 Kafka 分区,然后从中读取。我已经为此定义了一个 Avro 模式:
{ "namespace":"sample",
"type":"record",
"name":"TestData",
"fields":[
{"name": "update_database_time", "type": "long", "logicalType": "timestamp-millis"}
]
}
但是,我在 producer.send 行中收到转换错误:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long
我该如何解决这个问题?
下面是将时间戳写入Kafka的代码:
val tmstpOffset = testDataDF
.select("update_database_time")
.orderBy(desc("update_database_time"))
.head()
.getTimestamp(0)
val avroRecord = new GenericData.Record(parseAvroSchemaFromFile("/avro-offset-schema.json"))
avroRecord.put("update_database_time", tmstpOffset)
val producer = new KafkaProducer[String, GenericRecord](kafkaParams().asJava)
val data = new ProducerRecord[String, GenericRecord]("app_state_test7", avroRecord)
producer.send(data)
Avro 不直接支持timestamp 的时间,但逻辑上支持long。因此,您可以将其转换为 long 并按如下方式使用。 unix_timestamp() 函数用于转换,但如果您有特定的日期格式,请使用 unix_timestamp(col, dataformat) 重载函数。
import org.apache.spark.sql.functions._
val tmstpOffset = testDataDF
.select((unix_timestamp("update_database_time")*1000).as("update_database_time"))
.orderBy(desc("update_database_time"))
.head()
.getTimestamp(0)