Hazelcast Jet 是否支持以 Kafka 作为源的滚动数字作为 IMap 键?

Does Hazelcast Jet support rolling number as IMap key, with Kafka as a source?

我前段时间使用过 Hazelcast,这是我第一次使用 Hazelcast Jet,似乎对处理一些实时流很感兴趣,正在探索。

这里有一个情况,我将 Kafka topic 拉到 IMap 使用:

private static Pipeline buildPipelineForClientDataa() {
        Pipeline p = Pipeline.create();
        p.drawFrom(KafkaSources.kafka(
                props("bootstrap.servers", BOOTSTRAP_SERVERS, 
                        "key.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "value.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "auto.offset.reset", AUTO_OFFSET_RESET), 
                KAFKA_TOPIC))
        .withoutTimestamps()
        .drainTo(Sinks.map(SINK_CLINET_DATA));
        return p;
    }

好吧,我没有主题的关键。我应该选择将滚动号码指定为密钥吗?如果是这样,请帮助我使用该技术。谢谢

使用递增数字不适合 Jet,因为它是分布式系统。它适用于分区流,每个流分区应该是独立的。您需要通过非并行处理器路由所有项目。

您可以使用 UUID 或 Hazelcast 的 FlakeIdGenerator 作为键,但是如果作业重新启动并从快照偏移量重新处理 Kafka 主题,相同的项目将分配不同的键并将在目标地图中出现两次。

如果想在map中拥有每一项,可以使用Kafka的topic+partitionId+offset组合作为key:

p.drawFrom(KafkaSources.kafka(
    props(...),
    record -> Util.entry(
        Tuple3.tuple3(record.topic(), record.partition(), record.offset()),
        record.value()),
    KAFKA_TOPIC))

如果只有一个话题,可以省略话题。