kafka-streams 会话 windows 保留期

kafka-streams session windows retention period

我正在会话 windows 上进行 POC 以满足我的流要求之一。为此,我使用 Session windows 因为这符合我必须将具有唯一事务 ID 的事件聚合到列表中的要求。每个事务可以有多个不同应用产生的事件,推送给kafka。下面是代码

    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    Serde<String> stringSerde = Serdes.String();
    Serde<Transaction> transactionSerde = StreamsSerdes.TransactionSerde();

    Aggregator<String,Transaction, List<Transaction>> agg = (key, value, list)
            -> {
        list.add(value);
        return list;
    };

    Merger<String, List<Transaction>> merger = (key, v1, v2) ->
       Stream.concat(v1.stream(), v2.stream())
               .collect(Collectors.toList());

    Materialized<String,List<Transaction>,SessionStore<Bytes, byte[]>>
            materialized = Materialized.<String,List<Transaction>>as(Stores
            .persistentSessionStore
            ("trans-store", 1000 * 30)).withKeySerde(stringSerde).withValueSerde(StreamsSerdes
            .TransactionsListSerde());


    Initializer<List<Transaction>> init = () -> new ArrayList<>();

    StreamsBuilder builder = new StreamsBuilder();
    KTable<Windowed<String>, List<Transaction>> customerTransactionCounts =
             builder.stream(TRANSACTIONS_TOPIC, Consumed.with(stringSerde, transactionSerde).withOffsetResetPolicy(LATEST))
            .groupBy((noKey, transaction) -> transaction.getCustomerId(),
                    Serialized.with(stringSerde, transactionSerde))
            .windowedBy(SessionWindows.with(10000).until(1000 * 30))
                     .aggregate(init,agg,merger,materialized);


    customerTransactionCounts.toStream().print(Printed.<Windowed<String>, List<Transaction>>toSysOut()
            .withLabel("Customer Transactions List").withKeyValueMapper((key, list) ->
                    ("Current Time " + new Date().toString() + " Customer Id - " + key.key()  +
                    " START " +
                    new Date
                    (key.window().start()).toString() + " --- END " + new Date(key.window().end()).toString()+ "  " + list)));


    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
    kafkaStreams.cleanUp();

此处的保留期如何运作??

1) 首先我提取了一些交易 ID X 的数据,事件日期范围从

开始 4 月 16 日星期一 22:25:40 2018 年美国东部时间 --- 结束 4 月 16 日星期一 22:25:49 2018 年美国东部时间

所有这些都被汇总到同一会话中。

2) 接下来我摄取单条记录交易 ID X 时间 开始 2018 年 4 月 16 日星期一 22:26:45 美国东部时间。

我按预期在提交间隔后看到 1 条记录

根据我的理解,流时间更改为 22:26:45。此时,上面提取的记录应该从状态存储中过期,因为结束时间 < 流时间 - 保留期(30 秒)

3) 接下来我摄取了与第一组事件属于同一时间范围的记录单个记录事务 id X。我在提交间隔后看到了第一步的所有记录和聚合结果中的新当前记录。

第一组记录是否应该从状态存储中过期,因为它们已经过了保留期???

在第三步中,我假设我只会得到一个聚合记录,因为应该删除较旧的记录。保留期何时开始以从状态存储中删除记录?

保留时间是 "minimum" -- 为了更有效地过期,数据存储在所谓的段中(基于时间间隔),当段中的所有数据都经过保留时间时,段就会过期。