Spark Structured Streaming:在 Tumbling Window 而不是 Batch 结束时输出结果
Spark Structured Streaming: Output result at the end of Tumbling Window and not the Batch
我希望 Spark Stream 的输出在 Tumbling Window 结束时发送到 Sink,而不是在批处理间隔。
我正在读取一个 Kafka 流并输出到另一个 Kafka 流。
查询和写入输出的代码如下:
Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));
StreamingQuery query = sqlResult.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "aggregated-topic")
.option("checkpointLocation", "c:/tmp")
.outputMode(OutputMode.Update())
.start();
当我在 1 分钟 的 window 内为特定用户发送多个记录时,我想要 1 分钟结束时这些事件的总和.
但是我在输出 Kafka 流上得到了多个输出,并向其中写入了间歇性聚合。
例如
我将在一分钟内发送以下 7 条记录window,但会每隔一段时间发送一次。
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
我得到的输出是这样的:
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
可以看到,输出在同一个window内,但是有多个输出
我想要的是在分钟结束时单输出为
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
如何实现?
您需要在将流写入接收器时设置处理触发器。
您使用具有适当触发值的 DataStreamWriter 的 .trigger(Trigger.ProcessingTime)。
StreamingQuery query = sqlResult.writeStream()
.trigger(Trigger.ProcessingTime("1 minute")) //this
我希望 Spark Stream 的输出在 Tumbling Window 结束时发送到 Sink,而不是在批处理间隔。
我正在读取一个 Kafka 流并输出到另一个 Kafka 流。
查询和写入输出的代码如下:
Dataset<Row> sqlResult = session.sql("select window, user, sum(amount) as amount from users where type = 'A' group by window(timestamp, '1 minute', '1 minute'), user");
sqlResult = sqlResult.select(to_json(struct("window", "user", "amount")).as("value"));
StreamingQuery query = sqlResult.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "aggregated-topic")
.option("checkpointLocation", "c:/tmp")
.outputMode(OutputMode.Update())
.start();
当我在 1 分钟 的 window 内为特定用户发送多个记录时,我想要 1 分钟结束时这些事件的总和.
但是我在输出 Kafka 流上得到了多个输出,并向其中写入了间歇性聚合。
例如
我将在一分钟内发送以下 7 条记录window,但会每隔一段时间发送一次。
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
>{ "id" : 123, "type": "A", "user": "tukaram", "amount": 10}
我得到的输出是这样的:
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":10.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":20.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":40.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":60.0}
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
可以看到,输出在同一个window内,但是有多个输出
我想要的是在分钟结束时单输出为
{"window":{"start":"2020-09-18T14:35:00.000+05:30","end":"2020-09-18T14:36:00.000+05:30"},"user":"tukaram","amount":70.0}
如何实现?
您需要在将流写入接收器时设置处理触发器。
您使用具有适当触发值的 DataStreamWriter 的 .trigger(Trigger.ProcessingTime)。
StreamingQuery query = sqlResult.writeStream()
.trigger(Trigger.ProcessingTime("1 minute")) //this