从kafka主题聚合数据并上传到新主题
Aggregate data from kafka topic and upload it in the new topic
val jsonSchema = StructType(Array(
StructField("event_type", StringType),
StructField("category", StringType),
StructField("item_id", StringType),
StructField("item_price", IntegerType),
StructField("uid", StringType),
StructField("timestamp", LongType)
))
我有一个 kafka 主题,其 json 值在上面的方案中表示。我需要从最早的时间戳到另一个 kafka 主题,按小时放置聚合数据。我知道我需要使用 window 的更新方法,但我不明白如何以正确的方式做到这一点
我假设我想像这样直播
val newData = spark
.readStream
.format("kafka")
.options(kafkaParams)
.load
.select(from_json($"value".cast("string"), jsonSchema).alias("value"))
但我真的不明白如何用 json 像这样将其转换为新值:
{"start_ts":1577865600,"end_ts":1577869200,"revenue": sum of item_price,"visitors": count of uids},
{"start_ts":1577869200,"end_ts":1577872800,"revenue":sum of item_price,"visitors":count of uids},
...
你可以这样做:
val parseddf = newData
.select('value.cast("string"))
.withColumn("value",from_json(col("value"),jsonSchema))
.select(col("value.*"))
.withColumn("timestamp", from_unixtime($"timestamp"/1000).cast(TimestampType))
val uData = parseddf
.withWatermark("timestamp", "60 minutes")
.na.fill("undefined")
.withColumn("uid", when($"uid" === "undefined", 0).otherwise(1))
.groupBy(window($"timestamp", "60 minutes"))
.agg(sum("item_price")) as("revenue"), sum("uid") as("visitors"))
.withColumn("start_ts", unix_timestamp($"window.start"))
.withColumn("end_ts", unix_timestamp($"window.end"))
.withColumn("value", to_json(struct($"start_ts", $"end_ts", $"revenue", $"visitors")))
.drop("window", "revenue", "visitors", "start_ts", "end_ts")
.writeStream
.outputMode("update")
.format("kafka")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("kafka.bootstrap.servers", "$server")
.option("checkpointLocation", s"$checkpoint")
.option("topic", s"$kafka_topic")
.start()
val jsonSchema = StructType(Array(
StructField("event_type", StringType),
StructField("category", StringType),
StructField("item_id", StringType),
StructField("item_price", IntegerType),
StructField("uid", StringType),
StructField("timestamp", LongType)
))
我有一个 kafka 主题,其 json 值在上面的方案中表示。我需要从最早的时间戳到另一个 kafka 主题,按小时放置聚合数据。我知道我需要使用 window 的更新方法,但我不明白如何以正确的方式做到这一点
我假设我想像这样直播
val newData = spark
.readStream
.format("kafka")
.options(kafkaParams)
.load
.select(from_json($"value".cast("string"), jsonSchema).alias("value"))
但我真的不明白如何用 json 像这样将其转换为新值:
{"start_ts":1577865600,"end_ts":1577869200,"revenue": sum of item_price,"visitors": count of uids},
{"start_ts":1577869200,"end_ts":1577872800,"revenue":sum of item_price,"visitors":count of uids},
...
你可以这样做:
val parseddf = newData
.select('value.cast("string"))
.withColumn("value",from_json(col("value"),jsonSchema))
.select(col("value.*"))
.withColumn("timestamp", from_unixtime($"timestamp"/1000).cast(TimestampType))
val uData = parseddf
.withWatermark("timestamp", "60 minutes")
.na.fill("undefined")
.withColumn("uid", when($"uid" === "undefined", 0).otherwise(1))
.groupBy(window($"timestamp", "60 minutes"))
.agg(sum("item_price")) as("revenue"), sum("uid") as("visitors"))
.withColumn("start_ts", unix_timestamp($"window.start"))
.withColumn("end_ts", unix_timestamp($"window.end"))
.withColumn("value", to_json(struct($"start_ts", $"end_ts", $"revenue", $"visitors")))
.drop("window", "revenue", "visitors", "start_ts", "end_ts")
.writeStream
.outputMode("update")
.format("kafka")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("kafka.bootstrap.servers", "$server")
.option("checkpointLocation", s"$checkpoint")
.option("topic", s"$kafka_topic")
.start()