使用 Spark 读取 Elasticsearch 记录时的时间戳无效
Invalid timestamp when reading Elasticsearch records with Spark
使用带有 elasticsearch-hadoop 库的 Spark 读取 Elasticsearch 记录时,我得到无效的时间戳。我正在使用以下 Spark 代码读取记录:
val sc = spark.sqlContext
val elasticFields = Seq(
"start_time",
"action",
"category",
"attack_category"
)
sc.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'aggattack-2021.01')" )
val all = sc.sql(
s"""
|SELECT ${elasticFields.mkString(",")}
|FROM myIndex
|""".stripMargin)
all.show(2)
这导致以下结果:
+-----------------------+------+---------+---------------+
|start_time |action|category |attack_category|
+-----------------------+------+---------+---------------+
|1970-01-19 16:04:27.228|drop |udp-flood|DoS |
|1970-01-19 16:04:24.027|drop |others |DoS |
+-----------------------+------+---------+---------------+
但我希望有当年的时间戳,例如 2021-01-19 16:04:27.228
。在弹性中,start_time
字段具有以毫秒为单位的 unixtime 格式 -> start_time": 1611314773.641
问题出在 ElasticSearch 中的数据上。 start_time
字段被映射为 epoch_seconds
并包含带小数点后三位的纪元秒值(例如 1611583978.684
)。在我们将纪元时间转换为没有任何小数位的毫秒后,一切正常
使用带有 elasticsearch-hadoop 库的 Spark 读取 Elasticsearch 记录时,我得到无效的时间戳。我正在使用以下 Spark 代码读取记录:
val sc = spark.sqlContext
val elasticFields = Seq(
"start_time",
"action",
"category",
"attack_category"
)
sc.sql(
"CREATE TEMPORARY TABLE myIndex " +
"USING org.elasticsearch.spark.sql " +
"OPTIONS (resource 'aggattack-2021.01')" )
val all = sc.sql(
s"""
|SELECT ${elasticFields.mkString(",")}
|FROM myIndex
|""".stripMargin)
all.show(2)
这导致以下结果:
+-----------------------+------+---------+---------------+
|start_time |action|category |attack_category|
+-----------------------+------+---------+---------------+
|1970-01-19 16:04:27.228|drop |udp-flood|DoS |
|1970-01-19 16:04:24.027|drop |others |DoS |
+-----------------------+------+---------+---------------+
但我希望有当年的时间戳,例如 2021-01-19 16:04:27.228
。在弹性中,start_time
字段具有以毫秒为单位的 unixtime 格式 -> start_time": 1611314773.641
问题出在 ElasticSearch 中的数据上。 start_time
字段被映射为 epoch_seconds
并包含带小数点后三位的纪元秒值(例如 1611583978.684
)。在我们将纪元时间转换为没有任何小数位的毫秒后,一切正常