如何在具体化 GlobalKTable 之前过滤掉不需要的记录?

How to filter out unnecessary records before materializing GlobalKTable?

对于 Kafka Stream,我总是使用以下代码从参考紧凑主题初始化我的存储:

builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

我想在开店前过滤[=​​12=]话题,以剔除一些不必要的商家。

像这样:

GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
         .map(MerchantAvro::getDeletionDate)
         .isPresent());
...

但是无法在 GlobalKTable 上应用 filter 方法。

如何进行过滤?

您需要先过滤主题,然后将结果放入另一个主题。然后,您可以将第二个主题消费为 GlobalKTable.

作为替代方案,您可以使用 "global store" 而不是 GlobalKTable。对于这种情况,您可以提供自定义 Processor,它可以在填充全局存储之前实现过滤器。参见 Defining a Stream Processor

全球商店也是本地的。不同的是,对于"regular store"数据是分区的,即每个store包含不同的数据,而对于global store,每个实例加载所有数据(即数据被复制)。因此,组中的每个成员都有自己的全局存储数据副本。

我做了一个 "Streamer" 像这样将人转变为客户: 这是拓扑:

    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    Topology topology = builder.build();
    topology.addSource("person$", kafkaTopic)
            .addProcessor("selection", PersonProcessor::new, "person$")
            .addSink("customer$", customerTopic, "selection");
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));

这是处理器:

public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
    Logger journal = LoggerFactory.getLogger(PersonProcessor.class);

    @Override
    public void process(String key, PersonAvro avroPerson) {
        journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
        Optional.ofNullable(avroPerson)
                .filter(person -> Optional.ofNullable(person)
                        .map(PersonAvro::getActive)
                        .filter(activation -> !activation.matches("0"))
                        .isPresent())
                .map(person -> CustomerAvro.newBuilder()
                        .setId(person.getId())
                        .setCompName(person.getCompName())
                        .setSiretCode(person.getSiretCode())
                        .setActive(person.getActive())
                        .setAdd3(person.getAdd3())
                        .setAdd4(person.getAdd4())
                        .setAdd5(person.getAdd5())
                        .setAdd6(person.getAdd6())
                        .setAdd7(person.getAdd7()))
                .map(CustomerAvro.Builder::build)
                .ifPresent(customer -> {
                    context().forward(key, customer);
                    context().commit();
                });
    }
}

另一个从 GlobalKTable 加载本地存储的流媒体

@PostConstruct
private void init() throws InterruptedException {
    configurer();
    journal.info("Open topic {}...", kafkaTopic);
    StreamsBuilder builder = new StreamsBuilder();
    builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
    customerStore = waitUntilStoreIsQueryable("customerStore", streams);
    Runtime.getRuntime()
            .addShutdownHook(new Thread(streams::close));
}

并且能够响应同步请求:

public Optional<CustomerDto> getClient(int idCoclico) {
    journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
    // Recherche du client dans le cache
    Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
            .map(String::valueOf)
            .map(customerStore::get)
            .map(avroCustomer -> {
                journal.debug(Markers.append("idCoclico", idCoclico),
                        "Le client existe dans le store local et n'est pas inactif");
                CustomerDto client = new CustomerDto(avroCustomer.getId());
                client.setCompName(avroCustomer.getCompName());
                client.setSiretCode(avroCustomer.getSiretCode());
                client.setAdd3(avroCustomer.getAdd3());
                client.setAdd4(avroCustomer.getAdd4());
                client.setAdd5(avroCustomer.getAdd5());
                client.setAdd6(avroCustomer.getAdd6());
                client.setAdd7(avroCustomer.getAdd7());
                Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
                        .map(String::trim)
                        .filter(add -> !add.isEmpty());
                // Si l'adresse est renseignée dans COCLICO
                if (optAdd.isPresent())
                    client.setCountryCode(avroCustomer.getCountryCode());
                // Les adresses Françaises ne sont pas renseignée
                else
                    client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
                return client;
            });
    if (!optClient.isPresent())
        journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
    return optClient;
}

第一次测试还是ok的。我将尝试在构建环境中部署它...