Akka 流滑动 window 控制 reduce emit to sink by SourceQueue
Akka stream sliding window to control reduce emit to sink by SourceQueue
更新 : 我把我的问题放在 test project 来详细解释我的意思
============================================= ========================
我有 Akka 源代码,可以继续从数据库中读取 table,然后按一些键进行分组,然后减少它。然而,在我应用 reduce 功能后,数据似乎永远不会发送到接收器,它会继续减少,因为上游总是有数据来。
我读了一些 post,并尝试了 groupedWithin 和滑动,但它并没有像我想的那样工作,它只是将消息分组到更大的部分,但从不使上游暂停并发送到接收器。以下是 Akka stream 2.5.2
中的代码
Source reduce代码:
source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();
水槽和运行:
Sink<Object, CompletionStage<Done>> sink =
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);
我也试过.takeWhile(predicated);我用timer来切换断言值true和false,但是好像只会把第一个切换到false,当我切换回true时并没有重新启动upstream。
请帮助我提前谢谢!
============================================= ====
更新
information about the type of elements
添加我想要的:
我有 class 调用 SystemCodeTracking
包含 2 个属性 (id, entityName)
我将有对象列表:(1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")
我想 groupBy entityName 然后求和 id ,因此,我想看到的结果如下
("table1" 1+4),("table3", 3+5),("table2", 2)
我现在做的代码如下
source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)
我现在的问题更多是关于如何构建扫描初始状态
我应该怎么做?
scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))
所以你想要的是,如果我理解一切都很好:
- 首先,按id分组
- 然后按时间分组window,在这个时间里面window,把所有的
systemCodeTracking.getId()
加起来
对于第一部分,您需要 groupBy
。对于第二部分 groupedWithin
。但是,它们的工作方式不同:第一个会给你子流,而第二个会给你一个列表流。
因此,我们将不得不以不同的方式处理它们。
首先,让我们为您的列表编写一个 reducer:
private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
if (list.isEmpty()) {
throw new Exception();
} else {
SystemCodeTracking building = list.get(0);
building.setId(0L);
list.forEach(next -> building.setId(building.getId() + next.getId()));
return building;
}
}
因此,对于列表中的每个元素,我们递增 building.id
以在遍历整个列表后获得我们想要的值。
现在你只需要做
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
.groupBy(20000, SystemCodeTracking::getEntityName) // group by name
.groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100)
.filterNot(List::isEmpty) // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
.map(this::reduceList) // reduce each list to sum the ids
.log("====== doing reduceing ") // log each passing element using akka logger, rather than `System.out.println`
.mergeSubstreams() // merge back all elements with different names
更新 : 我把我的问题放在 test project 来详细解释我的意思
============================================= ========================
我有 Akka 源代码,可以继续从数据库中读取 table,然后按一些键进行分组,然后减少它。然而,在我应用 reduce 功能后,数据似乎永远不会发送到接收器,它会继续减少,因为上游总是有数据来。
我读了一些 post,并尝试了 groupedWithin 和滑动,但它并没有像我想的那样工作,它只是将消息分组到更大的部分,但从不使上游暂停并发送到接收器。以下是 Akka stream 2.5.2
中的代码Source reduce代码:
source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();
水槽和运行:
Sink<Object, CompletionStage<Done>> sink =
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);
我也试过.takeWhile(predicated);我用timer来切换断言值true和false,但是好像只会把第一个切换到false,当我切换回true时并没有重新启动upstream。
请帮助我提前谢谢!
============================================= ====
更新
information about the type of elements
添加我想要的:
我有 class 调用 SystemCodeTracking
包含 2 个属性 (id, entityName)
我将有对象列表:(1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")
我想 groupBy entityName 然后求和 id ,因此,我想看到的结果如下
("table1" 1+4),("table3", 3+5),("table2", 2)
我现在做的代码如下
source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)
我现在的问题更多是关于如何构建扫描初始状态 我应该怎么做?
scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))
所以你想要的是,如果我理解一切都很好:
- 首先,按id分组
- 然后按时间分组window,在这个时间里面window,把所有的
systemCodeTracking.getId()
加起来
对于第一部分,您需要 groupBy
。对于第二部分 groupedWithin
。但是,它们的工作方式不同:第一个会给你子流,而第二个会给你一个列表流。
因此,我们将不得不以不同的方式处理它们。
首先,让我们为您的列表编写一个 reducer:
private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
if (list.isEmpty()) {
throw new Exception();
} else {
SystemCodeTracking building = list.get(0);
building.setId(0L);
list.forEach(next -> building.setId(building.getId() + next.getId()));
return building;
}
}
因此,对于列表中的每个元素,我们递增 building.id
以在遍历整个列表后获得我们想要的值。
现在你只需要做
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
.groupBy(20000, SystemCodeTracking::getEntityName) // group by name
.groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100)
.filterNot(List::isEmpty) // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
.map(this::reduceList) // reduce each list to sum the ids
.log("====== doing reduceing ") // log each passing element using akka logger, rather than `System.out.println`
.mergeSubstreams() // merge back all elements with different names