从kafka主题聚合数据并上传到新主题

Aggregate data from kafka topic and upload it in the new topic

val jsonSchema = StructType(Array(
    StructField("event_type", StringType),
    StructField("category", StringType),
    StructField("item_id", StringType),
    StructField("item_price", IntegerType),
    StructField("uid", StringType),
    StructField("timestamp", LongType)
))

我有一个 kafka 主题,其 json 值在上面的方案中表示。我需要从最早的时间戳到另一个 kafka 主题,按小时放置聚合数据。我知道我需要使用 window 的更新方法,但我不明白如何以正确的方式做到这一点

我假设我想像这样直播

val newData = spark
    .readStream
    .format("kafka")
    .options(kafkaParams)
    .load
    .select(from_json($"value".cast("string"), jsonSchema).alias("value"))

但我真的不明白如何用 json 像这样将其转换为新值:

{"start_ts":1577865600,"end_ts":1577869200,"revenue": sum of item_price,"visitors": count of uids},
{"start_ts":1577869200,"end_ts":1577872800,"revenue":sum of item_price,"visitors":count of uids},
...

你可以这样做:

val parseddf = newData
    .select('value.cast("string"))
    .withColumn("value",from_json(col("value"),jsonSchema))
    .select(col("value.*"))
    .withColumn("timestamp", from_unixtime($"timestamp"/1000).cast(TimestampType))

val uData = parseddf
    .withWatermark("timestamp", "60 minutes")
    .na.fill("undefined")
    .withColumn("uid", when($"uid" === "undefined", 0).otherwise(1))
    .groupBy(window($"timestamp", "60 minutes"))
    .agg(sum("item_price")) as("revenue"), sum("uid") as("visitors"))
    .withColumn("start_ts", unix_timestamp($"window.start"))
    .withColumn("end_ts", unix_timestamp($"window.end"))
    .withColumn("value", to_json(struct($"start_ts", $"end_ts", $"revenue", $"visitors")))
    .drop("window", "revenue", "visitors", "start_ts", "end_ts")
    .writeStream
    .outputMode("update")
    .format("kafka")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .option("kafka.bootstrap.servers", "$server")
    .option("checkpointLocation", s"$checkpoint")
    .option("topic", s"$kafka_topic")
    .start()