KStream聚合固定列表(限制大小)

KStream aggregate fixed list (limited size)

我想知道是否可以(默认支持)为从特定主题收到的特定批次消息添加聚合。

#description:我有一些玩家正在寻找自动创建的大厅。这个想法是玩家搜索游戏并将带有 { id, mod, map } 的消息发送到主题。每个 mod/map 都包含一个规则,可以允许多少玩家加入,并根据该数量我尝试汇总这些玩家。当大厅(游戏)的状态看起来准备好启动大厅(10/10 玩家)时,我们应该将详细信息转换为单个消息并将其进一步推送以进行处理,同时应该刷新聚合并从 0 开始。

KStream<String, List<String>> gameLobbyEventStream =
    streamsBuilder.stream(
            "GAME_MATCHMAKING_LOBBY_EVENT-TOPIC",
            Consumed.with(Serdes.String(), Serdes.serdeFrom(String.class)))
        .peek((k, v) -> log.info("key:{}, value:{}", k, v))
        .groupBy((k, v) -> k, Grouped.with(Serdes.String(), Serdes.String()))
        .aggregate(
            ArrayList::new,
            (k, v, agg) -> {
              agg.add(v);
              return agg;
            },
            Materialized.with(
                Serdes.String(), Serdes.ListSerde(ArrayList.class, Serdes.String())))
        .toStream();

gameLobbyEventStream
    .filter((k, v) -> isReadyToStartLobby(v))
    .peek((k, v) -> log.info("lobby is creating for {}", v))
    // convert to lobby { id, players, details }
    .to("LAUNCH_MATCHMAKING_LOBBY_EVENT-TOPIC");

#real case: mode: 死亡竞赛 (3x3 : 6 玩家允许)

 -> player-1 clicked to find a game
 -> player-2 clicked to find a game   
 -> player-3 clicked to find a game  
 -> player-4 clicked to find a game  
 -> player-5 clicked to find a game
 -> player-6 clicked to find a game

 -> lobby has been created for players above {aggregation finished}

 -> player-7 clicked to find a game
 -> player-8 clicked to find a game
 -> {aggregation status: **2** of **6** to create new lobby}

#note:没有时间window。如果玩家想找到一个大厅,我必须让他等到他想等多久,直到他取消搜索。

我正在聚合列表<>,并希望在聚合(列表)大小等于 mod 所选玩家的预期数量时创建一个大厅。

我面临的问题是聚合在没有重置的情况下一遍又一遍地增加,我看到数百名玩家在等待大厅开始。我期望的行为类似于将玩家数量推到大厅并进一步处理的块,同时下一个块应该再次将玩家从零聚合到预期玩家数量作为新大厅。

下面是处理玩家列表预期聚合的代码。这里的问题是我们可能需要停止对特定玩家的搜索(他发送了一个事件来停止搜索),这增加了拒绝您最近加入的大厅的难度。 (如果我们通过弹出菜单通知客户并以 accepting/canceling 大厅作为批准已经在大厅中的这些玩家的决定的最终决定,则可以选择管理案例)

.aggregate(ArrayList::new,
    (k, v, agg) -> {
        if (agg.size() == 5) {
          return new ArrayList<>(List.of(v)); 
        }
       agg.add(v);
      return agg; 
}