groupByKey 创建重新分区主题,即使没有密钥更改

groupByKey creates repartition topic even though there is no key change

我正在尝试在 kafka 流 (Kafka 1.0.1) 和 spring 云流 (2.0.0-build-snapshot) 的帮助下实现一个简单的事件源服务。我的 StreamListener 方法只读取与我的聚合状态变化对应的事件的 Kstream,并将它们应用于聚合并将最新状态保存在本地状态存储(kafka 提供的状态存储)中。域事件消息还具有与聚合的 uuid(String) 相同的键。这是代码:

@StreamListener(Channels.EVENTS_INPUT_CHANNEL)
public void listen(KStream<String, DomainEvent> stream) {
    Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
    Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);
    stream
        .groupByKey(Serialized.with(Serdes.String(), domainEventSerde))
        .aggregate(
                Slot::new, 
                (s, domainEvent, slot) -> slot.handle(domainEvent),
                Materialized.<String, Slot, KeyValueStore<Bytes, byte[]>>
                as(Repository.SNAPSHOTS_FOR_SLOTS)
                    .withKeySerde(Serdes.String()).withValueSerde(slotSerde)
        );
}

上面的代码产生了一个变更日志主题(如预期的那样):slot-service-slots-changelog。虽然它也创建了一个重新分区主题:slot-service-slots-repartition。这两个主题似乎具有完全相同的消息(键和值)。我的理解是,如果没有对流进行密钥修改操作,则不需要重新分区。我在这里遗漏了什么吗?

更新: 这可能不再需要,因为 sobychacko 已经提供了解释,但是我确实尝试过没有如下所示的云流绑定并且它没有创建重新分区主题:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfiguration {

    @Bean
    KafkaTemplate<String, DomainEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    ProducerFactory<String,DomainEvent> producerFactory() {
        return new DefaultKafkaProducerFactory<>(config());
    }

    private Map<String, Object> config() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return config;
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    StreamsConfig streamsConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "slot-service");
        return new StreamsConfig(config);
    }

    @Bean
    KTable<String, Slot> kTable(KStreamBuilder builder) {
        Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class);
        Serde<Slot> slotSerde = new JsonSerde<>(Slot.class);

        return
                builder
                .stream(Serdes.String(), domainEventSerde, Repository.SLOT_EVENTS)
                .groupByKey(Serdes.String(), domainEventSerde)
                .aggregate(
                    Slot::new, 
                    (s, domainEvent, slot) -> slot.handle(domainEvent),
                    slotSerde,
                    Repository.SNAPSHOTS_FOR_SLOTS);
    }

    }

另外,制作人如下:

@Autowired
    public Repository(KafkaTemplate<String, DomainEvent> kafkaTemplate, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
        this.kafkaTemplate = kafkaTemplate;
        this.kStreamBuilderFactoryBean = kStreamBuilderFactoryBean;
    }

    public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(
            domainEvent -> kafkaTemplate.send(SLOT_EVENTS, domainEvent.aggregateUUID().toString(),domainEvent) 
        );
        slot.flushEvents();
    }

更新 2:

这里是带云流的生产者代码:

public void save(Slot slot) {
        List<DomainEvent> newEvents = slot.getDirtyEvents();
        newEvents.forEach(domainEvent -> channels.eventsOutputChannel().send(MessageBuilder.withPayload(domainEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, slot.getUuid().toString()).build()));
        slot.flushEvents();
    }

有一个 map() 操作发生在我们执行入站反序列化的方法调用之前(我假设在上面的示例中禁用了本机反序列化)。正如 Matthias 指出的那样,如果有一个 map() 操作,它会设置一个标志,并在随后的 groupByKey() 中创建一个重新分区主题。因此,这可能是您遇到的情况,因为框架会在入站消息转换过程中为您执行此 map 操作。如果你真的想避免创建这个重新分区主题,你可以启用 nativeDecoding 然后使用 Kafka 提供的 Serde 。这样框架就不会调用 map 操作。问题是您的代码中使用的 JsonSerde 不容易用作 Spring Cloud Stream 中的 Serde 属性,因为它需要 class 信息。在下一版本的 Spring Cloud Stream 中,我们将改善这种情况。同时,您可以提供自定义 Serde。希望这可以帮助。