如何将 kafka 时间戳值作为列包含在 spark 结构化流中?
How to include kafka timestamp value as columns in spark structured streaming?
我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了值字段并制作了数据框。我的问题是,我需要获取时间戳字段(来自 kafka)以及其他列。
这是我当前的代码:
val kafkaDatademostr = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
.option("subscribe","csvstream")
.load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
.select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
"split(value,',')[1] as DFW",
"split(value,',')[2] as DTG",
"split(value,',')[3] as CDF",
"split(value,',')[4] as DFO",
"split(value,',')[5] as SAD",
"split(value,',')[6] as DER",
"split(value,',')[7] as time_for",
"split(value,',')[8] as fort")
如何从 kafka 获取时间戳并将其添加为列以及其他列?
时间戳包含在源架构中。只需添加一个 "select timestamp" 即可获得如下所示的时间戳。
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
在 Apache Spark 官方网页您可以找到指南:Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
在那里您可以找到有关从 Kafka 加载的 DataFrame 架构的信息。
来自 Kafka 源的每一行都有以下列:
- key - 消息密钥
- 值 - 消息值
- topic - 命名消息主题
- 分区 - 该消息来自的分区
- offset - 消息的偏移量
- 时间戳 - 时间戳
- timestampType 时间戳类型
以上所有栏目均可查询。
在您的示例中,您仅使用 value
,因此要获取时间戳,只需将 timestamp
添加到您的 select 语句中:
val allFields = kafkaDatademostr.selectExpr(
s"CAST(value AS STRING) AS csv",
s"CAST(key AS STRING) AS key",
s"topic as topic",
s"partition as partition",
s"offset as offset",
s"timestamp as timestamp",
s"timestampType as timestampType"
)
在我的 Kafka 案例中,我收到了 JSON 格式的值。其中包含实际数据以及原始事件时间而不是 kafka 时间戳。以下是架构。
val mySchema = StructType(Array(
StructField("time", LongType),
StructField("close", DoubleType)
))
为了使用 Spark Structured Streaming 的 watermarking 功能,我不得不将 time 字段转换为时间戳格式。
val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", mySchema).as("data"))
.select(col("data.time").cast("timestamp").alias("time"),col("data.close"))
现在您可以使用时间字段进行window操作以及加水印目的。
import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
.groupBy(
window(col("time"), "1 minute", "30 seconds"),
$"close"
).count()
我希望这个答案能澄清。
我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了值字段并制作了数据框。我的问题是,我需要获取时间戳字段(来自 kafka)以及其他列。
这是我当前的代码:
val kafkaDatademostr = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
.option("subscribe","csvstream")
.load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
.select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
"split(value,',')[1] as DFW",
"split(value,',')[2] as DTG",
"split(value,',')[3] as CDF",
"split(value,',')[4] as DFO",
"split(value,',')[5] as SAD",
"split(value,',')[6] as DER",
"split(value,',')[7] as time_for",
"split(value,',')[8] as fort")
如何从 kafka 获取时间戳并将其添加为列以及其他列?
时间戳包含在源架构中。只需添加一个 "select timestamp" 即可获得如下所示的时间戳。
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
在 Apache Spark 官方网页您可以找到指南:Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
在那里您可以找到有关从 Kafka 加载的 DataFrame 架构的信息。
来自 Kafka 源的每一行都有以下列:
- key - 消息密钥
- 值 - 消息值
- topic - 命名消息主题
- 分区 - 该消息来自的分区
- offset - 消息的偏移量
- 时间戳 - 时间戳
- timestampType 时间戳类型
以上所有栏目均可查询。
在您的示例中,您仅使用 value
,因此要获取时间戳,只需将 timestamp
添加到您的 select 语句中:
val allFields = kafkaDatademostr.selectExpr(
s"CAST(value AS STRING) AS csv",
s"CAST(key AS STRING) AS key",
s"topic as topic",
s"partition as partition",
s"offset as offset",
s"timestamp as timestamp",
s"timestampType as timestampType"
)
在我的 Kafka 案例中,我收到了 JSON 格式的值。其中包含实际数据以及原始事件时间而不是 kafka 时间戳。以下是架构。
val mySchema = StructType(Array(
StructField("time", LongType),
StructField("close", DoubleType)
))
为了使用 Spark Structured Streaming 的 watermarking 功能,我不得不将 time 字段转换为时间戳格式。
val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", mySchema).as("data"))
.select(col("data.time").cast("timestamp").alias("time"),col("data.close"))
现在您可以使用时间字段进行window操作以及加水印目的。
import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
.groupBy(
window(col("time"), "1 minute", "30 seconds"),
$"close"
).count()
我希望这个答案能澄清。