Ignite 和 Kafka 集成

Ignite and Kafka Integration

我正在尝试 Ignite 和 Kafka 集成将 kafka 消息带入 Ignite 缓存。

我的消息密钥是一个随机字符串(要与 Ignite 一起使用,kafka 消息密钥不能为 null),值是一个 json 字符串表示形式,代表 Person(java class)

当 Ignite 收到这样的消息时,Ignite 会使用消息的键(在我的例子中是随机字符串)作为缓存键。

是否可以将消息键更改为该人的id,以便我可以将其放入缓存。

看起来 streamer.receiver(new StreamReceiver) 可行

        streamer.receiver(new StreamReceiver<String, String>() {
            public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException {
                for (Map.Entry<String, String> entry : entries) {
                    Person p = fromJson(entry.getValue());
                    //ignore the message key,and use person id as the cache key
                    cache.put(p.getId(), p);
                }
            }
        });

这是推荐的方式吗?我不确定在 StreamReceiver 中调用 cache.put 是否是正确的方法,因为它只是写入缓存之前的预处理步骤。

Data streamer 会将您的所有密钥映射到缓存关联节点,创建条目批次并将批次发送到关联节点。在它 StreamReceiver 将收到您的条目后,获取 Person 的 ID 并调用 cache.put(K, V)。放置条目导致将您的密钥映射到相应的缓存亲和节点并向该节点发送更新请求。

一切看起来都很好。但是从 Kafka 映射你的随机密钥的结果和映射 Person 的 ID 的结果会不同(很可能是不同的节点)。结果,由于冗余的网络跃点,您的性能会很差。

不幸的是,当前的 KafkaStreamer 实现不支持流元组提取器(参见 StreamSingleTupleExtractor class)。但是您可以使用现有的 Kafka 流媒体实现作为示例轻松创建自己的 Kafka 流媒体实现。

您也可以尝试使用 KafkaStreamerkeyDecodervalDecoder 来从 Kafka 消息中提取 Person 的 ID。我不确定,但它可以提供帮助。