KStream聚合固定列表(限制大小)
KStream aggregate fixed list (limited size)
我想知道是否可以(默认支持)为从特定主题收到的特定批次消息添加聚合。
#description:我有一些玩家正在寻找自动创建的大厅。这个想法是玩家搜索游戏并将带有 { id, mod, map } 的消息发送到主题。每个 mod/map 都包含一个规则,可以允许多少玩家加入,并根据该数量我尝试汇总这些玩家。当大厅(游戏)的状态看起来准备好启动大厅(10/10 玩家)时,我们应该将详细信息转换为单个消息并将其进一步推送以进行处理,同时应该刷新聚合并从 0 开始。
KStream<String, List<String>> gameLobbyEventStream =
streamsBuilder.stream(
"GAME_MATCHMAKING_LOBBY_EVENT-TOPIC",
Consumed.with(Serdes.String(), Serdes.serdeFrom(String.class)))
.peek((k, v) -> log.info("key:{}, value:{}", k, v))
.groupBy((k, v) -> k, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
ArrayList::new,
(k, v, agg) -> {
agg.add(v);
return agg;
},
Materialized.with(
Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String())))
.toStream();
gameLobbyEventStream
.filter((k, v) -> isReadyToStartLobby(v))
.peek((k, v) -> log.info("lobby is creating for {}", v))
// convert to lobby { id, players, details }
.to("LAUNCH_MATCHMAKING_LOBBY_EVENT-TOPIC");
#real case: mode: 死亡竞赛 (3x3 : 6 玩家允许)
-> player-1 clicked to find a game
-> player-2 clicked to find a game
-> player-3 clicked to find a game
-> player-4 clicked to find a game
-> player-5 clicked to find a game
-> player-6 clicked to find a game
-> lobby has been created for players above {aggregation finished}
-> player-7 clicked to find a game
-> player-8 clicked to find a game
-> {aggregation status: **2** of **6** to create new lobby}
#note:没有时间window。如果玩家想找到一个大厅,我必须让他等到他想等多久,直到他取消搜索。
我正在聚合列表<>,并希望在聚合(列表)大小等于 mod 所选玩家的预期数量时创建一个大厅。
我面临的问题是聚合在没有重置的情况下一遍又一遍地增加,我看到数百名玩家在等待大厅开始。我期望的行为类似于将玩家数量推到大厅并进一步处理的块,同时下一个块应该再次将玩家从零聚合到预期玩家数量作为新大厅。
下面是处理玩家列表预期聚合的代码。这里的问题是我们可能需要停止对特定玩家的搜索(他发送了一个事件来停止搜索),这增加了拒绝您最近加入的大厅的难度。
(如果我们通过弹出菜单通知客户并以 accepting/canceling 大厅作为批准已经在大厅中的这些玩家的决定的最终决定,则可以选择管理案例)
.aggregate(ArrayList::new,
(k, v, agg) -> {
if (agg.size() == 5) {
return new ArrayList<>(List.of(v));
}
agg.add(v);
return agg;
}
我想知道是否可以(默认支持)为从特定主题收到的特定批次消息添加聚合。
#description:我有一些玩家正在寻找自动创建的大厅。这个想法是玩家搜索游戏并将带有 { id, mod, map } 的消息发送到主题。每个 mod/map 都包含一个规则,可以允许多少玩家加入,并根据该数量我尝试汇总这些玩家。当大厅(游戏)的状态看起来准备好启动大厅(10/10 玩家)时,我们应该将详细信息转换为单个消息并将其进一步推送以进行处理,同时应该刷新聚合并从 0 开始。
KStream<String, List<String>> gameLobbyEventStream =
streamsBuilder.stream(
"GAME_MATCHMAKING_LOBBY_EVENT-TOPIC",
Consumed.with(Serdes.String(), Serdes.serdeFrom(String.class)))
.peek((k, v) -> log.info("key:{}, value:{}", k, v))
.groupBy((k, v) -> k, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
ArrayList::new,
(k, v, agg) -> {
agg.add(v);
return agg;
},
Materialized.with(
Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String())))
.toStream();
gameLobbyEventStream
.filter((k, v) -> isReadyToStartLobby(v))
.peek((k, v) -> log.info("lobby is creating for {}", v))
// convert to lobby { id, players, details }
.to("LAUNCH_MATCHMAKING_LOBBY_EVENT-TOPIC");
#real case: mode: 死亡竞赛 (3x3 : 6 玩家允许)
-> player-1 clicked to find a game
-> player-2 clicked to find a game
-> player-3 clicked to find a game
-> player-4 clicked to find a game
-> player-5 clicked to find a game
-> player-6 clicked to find a game
-> lobby has been created for players above {aggregation finished}
-> player-7 clicked to find a game
-> player-8 clicked to find a game
-> {aggregation status: **2** of **6** to create new lobby}
#note:没有时间window。如果玩家想找到一个大厅,我必须让他等到他想等多久,直到他取消搜索。
我正在聚合列表<>,并希望在聚合(列表)大小等于 mod 所选玩家的预期数量时创建一个大厅。
我面临的问题是聚合在没有重置的情况下一遍又一遍地增加,我看到数百名玩家在等待大厅开始。我期望的行为类似于将玩家数量推到大厅并进一步处理的块,同时下一个块应该再次将玩家从零聚合到预期玩家数量作为新大厅。
下面是处理玩家列表预期聚合的代码。这里的问题是我们可能需要停止对特定玩家的搜索(他发送了一个事件来停止搜索),这增加了拒绝您最近加入的大厅的难度。 (如果我们通过弹出菜单通知客户并以 accepting/canceling 大厅作为批准已经在大厅中的这些玩家的决定的最终决定,则可以选择管理案例)
.aggregate(ArrayList::new,
(k, v, agg) -> {
if (agg.size() == 5) {
return new ArrayList<>(List.of(v));
}
agg.add(v);
return agg;
}