KafkaStream KTable 转储

KafkaStream KTable dump

我的问题是关于在收到触发消息时转储其值满足特定条件的 KTable。

下面是这个问题的一个例子:

KTable - CurrentAccountBalance

   John    +10,
   Joe     -1,
   Alice   -2,
   Jill    +5,

我的要求是获取所有在传入事件中具有负余额的记录:FETCH_NEGATIVE_BALANCE_ENTRIES 到达 在不同的命令流上。

我的想法是这样的: 如果我们使用命令流在 CurrentAccountBalance KTable 上执行 leftJoin,我们可以转储 CurrentAccountBalance 的所有条目(可用于过滤器),但是,这不会发生。

leftJoin的ValueJoiner只接收右边的命令和左边的null(而不是所有的 CurrentAccountBalance 条目)。我错过了什么吗?

谢谢

KTable 连接中的单个输入消息将在 Kafka Stream 中产生一个(或零个)输出消息,因为 KTable 连接基于主键。

您可以做的是创建一个自定义 transformer 来接收命令并连接到 KTable 状态(如果指定 KTable 存储名称,您可以轻松地将状态连接到转换器).当收到命令时,您可以获得一个 state.all() 迭代器并搜索所有符合您条件的条目并通过 context.forward() 加上转换器下游的接收器发出它们。

这是我实现的转储满足特定条件的 KTable 条目的解决方案。在这里为将来寻找类似解决方案的其他人描述它。

  1. 创建了自定义映射器 (FlatMapper) 来生成满足条件的消息。我已将状态存储作为构造函数参数传递给此映射器。
  2. 创建了一个名为 'FLUSH_NEGATIVE_BALANCES' 的事件并在名为 commandStream 的新 KStream 上读取它
  3. 当收到这个新的 FLUSH 命令时,我调用在步骤 1 中创建的 flatmapper 来生成满足条件的消息(在我的例子中是负余额)
  4. 我将 flatmapper 的输出写入接收器流。

    /* 这是 flatmapper 的代码,步骤 1 在上面的描述中 */

    public class NegativeBalanceMapper 实现 KeyValueMapper>> { 私有最终 BusinessRuleValidator businessRuleValidator; private final StoreHolder storeHolder;

    public NegativeBalanceMapper(StoreHolder storeHolder, BusinessRuleValidator businessRuleValidator)
    {
        this.storeHolder = storeHolder;
        this.businessRuleValidator = businessRuleValidator;
    }
    
    public Iterable<KeyValue<String, BalanceEntry>> apply(String key, CommandEntry value)
    {
        List<KeyValue<String, BalanceEntry>> result = new ArrayList<>();
        if (value == null)
        {
            return result;
        }
    
        final ReadOnlyKeyValueStore<String, BalanceEntry> ledgerStore = storeHolder.getLedgerStore();
        if (ledgerStore != null)
        {
            KeyValueIterator<String, BalanceEntry> range = ledgerStore.all();
            while (range.hasNext())
            {
                KeyValue<String, BalanceEntry> next = range.next();
                if (businessRuleValidator.isNegativeBalance(next.value))
                {
                    result.add(new KeyValue<>(next.key, next.value));
                }
            }    
        }
        return result;
    }
    

    }

    /这里是接收到FLUSH命令时的调用方式——上面step2/ 最终 KStream negativeBalances = filteredCommandStream.flatMap(negativeBalanceMapper);

    /这里我把它推到上面的 sink-step4/

    negativeBalances.to(KafkaConstants.TOPIC_NEGATIVE_BALANCES,Produced.with(Serdes.String(), serdeRegistry.getBalanceEntrySerde()));

感谢 Matthias 的指导。