结构化流处理多个查询?
Structured Streaming process multiple query?
我使用 Spark Streaming 来处理在线需求,例如每小时的新用户数是这样的:
每个批次,当日志到来时,然后select来自外部table的uid,例如hbase或dynamodb,如果不存在,则插入table
这种方法使用 table 太频繁了,费用太高了。
现在想用结构化流来解决这个问题
以下sql可以离线解决问题:
sql1
create table event_min_table as select pageid,uid,floor(min(time)/36000)*3600 as event_time from event_table group by pageid,uid
sql2
select pageid,count(distinct uid) as cnt from event_min_table group by pageid,event_time
由于我对structured streaming不熟悉,structured streaming不支持多重聚合,所以我是这样使用的:
readStream
创建查询为 sql1
然后在内存中注册为 table 输出模式为 complete
从使用 sql2
的 table 创建查询,输出格式为 update
,保存到外部 table,如 hbase 或 dynamodb
不知道我的方法能不能解决问题,但是有几个问题:
如果我在complete
输出模式下创建内存table,数据会随着时间的推移而变大吗?
虽然这样也行,但是每次log来的时候是否输出结果,所以问题还是没有解决,我的目标是减少对外部的请求table,例如 hbase 或 dynamodb
1) if I create a memory table as complete output mode, the data will bigger as the time goes on?
我不这么认为(参见 the code)。
my goal is to decrease the request to the external table
您可以使用 KeyValueGroupedDataset.flatMapGroupsWithState 运算符完全控制在状态存储中保留的内容、时间和时间:
flatMapGroupsWithState Applies the given function to each group of data, while maintaining a user-defined per-group state. The result Dataset will represent the objects returned by the function. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger, and updates to each group's state will be saved across invocations.
这是您在结构化流中对过去和当前数据集可以获得的最大控制。
我使用 Spark Streaming 来处理在线需求,例如每小时的新用户数是这样的:
每个批次,当日志到来时,然后select来自外部table的uid,例如hbase或dynamodb,如果不存在,则插入table
这种方法使用 table 太频繁了,费用太高了。
现在想用结构化流来解决这个问题
以下sql可以离线解决问题:
sql1
create table event_min_table as select pageid,uid,floor(min(time)/36000)*3600 as event_time from event_table group by pageid,uid
sql2
select pageid,count(distinct uid) as cnt from event_min_table group by pageid,event_time
由于我对structured streaming不熟悉,structured streaming不支持多重聚合,所以我是这样使用的:
readStream
创建查询为sql1
然后在内存中注册为 table 输出模式为complete
从使用
sql2
的 table 创建查询,输出格式为update
,保存到外部 table,如 hbase 或 dynamodb
不知道我的方法能不能解决问题,但是有几个问题:
如果我在
complete
输出模式下创建内存table,数据会随着时间的推移而变大吗?虽然这样也行,但是每次log来的时候是否输出结果,所以问题还是没有解决,我的目标是减少对外部的请求table,例如 hbase 或 dynamodb
1) if I create a memory table as complete output mode, the data will bigger as the time goes on?
我不这么认为(参见 the code)。
my goal is to decrease the request to the external table
您可以使用 KeyValueGroupedDataset.flatMapGroupsWithState 运算符完全控制在状态存储中保留的内容、时间和时间:
flatMapGroupsWithState Applies the given function to each group of data, while maintaining a user-defined per-group state. The result Dataset will represent the objects returned by the function. For a static batch Dataset, the function will be invoked once per group. For a streaming Dataset, the function will be invoked for each group repeatedly in every trigger, and updates to each group's state will be saved across invocations.
这是您在结构化流中对过去和当前数据集可以获得的最大控制。