如何在具体化 GlobalKTable 之前过滤掉不需要的记录?
How to filter out unnecessary records before materializing GlobalKTable?
对于 Kafka Stream,我总是使用以下代码从参考紧凑主题初始化我的存储:
builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
我想在开店前过滤[=12=]话题,以剔除一些不必要的商家。
像这样:
GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
.map(MerchantAvro::getDeletionDate)
.isPresent());
...
但是无法在 GlobalKTable
上应用 filter
方法。
如何进行过滤?
您需要先过滤主题,然后将结果放入另一个主题。然后,您可以将第二个主题消费为 GlobalKTable
.
作为替代方案,您可以使用 "global store" 而不是 GlobalKTable
。对于这种情况,您可以提供自定义 Processor
,它可以在填充全局存储之前实现过滤器。参见 Defining a Stream Processor。
全球商店也是本地的。不同的是,对于"regular store"数据是分区的,即每个store包含不同的数据,而对于global store,每个实例加载所有数据(即数据被复制)。因此,组中的每个成员都有自己的全局存储数据副本。
我做了一个 "Streamer" 像这样将人转变为客户:
这是拓扑:
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor::new, "person$")
.addSink("customer$", customerTopic, "selection");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
这是处理器:
public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
Logger journal = LoggerFactory.getLogger(PersonProcessor.class);
@Override
public void process(String key, PersonAvro avroPerson) {
journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
Optional.ofNullable(avroPerson)
.filter(person -> Optional.ofNullable(person)
.map(PersonAvro::getActive)
.filter(activation -> !activation.matches("0"))
.isPresent())
.map(person -> CustomerAvro.newBuilder()
.setId(person.getId())
.setCompName(person.getCompName())
.setSiretCode(person.getSiretCode())
.setActive(person.getActive())
.setAdd3(person.getAdd3())
.setAdd4(person.getAdd4())
.setAdd5(person.getAdd5())
.setAdd6(person.getAdd6())
.setAdd7(person.getAdd7()))
.map(CustomerAvro.Builder::build)
.ifPresent(customer -> {
context().forward(key, customer);
context().commit();
});
}
}
另一个从 GlobalKTable 加载本地存储的流媒体
@PostConstruct
private void init() throws InterruptedException {
configurer();
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
customerStore = waitUntilStoreIsQueryable("customerStore", streams);
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
}
并且能够响应同步请求:
public Optional<CustomerDto> getClient(int idCoclico) {
journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
// Recherche du client dans le cache
Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
.map(String::valueOf)
.map(customerStore::get)
.map(avroCustomer -> {
journal.debug(Markers.append("idCoclico", idCoclico),
"Le client existe dans le store local et n'est pas inactif");
CustomerDto client = new CustomerDto(avroCustomer.getId());
client.setCompName(avroCustomer.getCompName());
client.setSiretCode(avroCustomer.getSiretCode());
client.setAdd3(avroCustomer.getAdd3());
client.setAdd4(avroCustomer.getAdd4());
client.setAdd5(avroCustomer.getAdd5());
client.setAdd6(avroCustomer.getAdd6());
client.setAdd7(avroCustomer.getAdd7());
Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
.map(String::trim)
.filter(add -> !add.isEmpty());
// Si l'adresse est renseignée dans COCLICO
if (optAdd.isPresent())
client.setCountryCode(avroCustomer.getCountryCode());
// Les adresses Françaises ne sont pas renseignée
else
client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
return client;
});
if (!optClient.isPresent())
journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
return optClient;
}
第一次测试还是ok的。我将尝试在构建环境中部署它...
对于 Kafka Stream,我总是使用以下代码从参考紧凑主题初始化我的存储:
builder.globalTable(kafkaTopic, Materialized.as("storeMerchant"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
我想在开店前过滤[=12=]话题,以剔除一些不必要的商家。
像这样:
GlobalKTable<String, MerchantAvro> merchant$ = builder.globalTable(kafkaTopic);
merchant$.filter((key, value) -> !Optional.ofNullable(value)
.map(MerchantAvro::getDeletionDate)
.isPresent());
...
但是无法在 GlobalKTable
上应用 filter
方法。
如何进行过滤?
您需要先过滤主题,然后将结果放入另一个主题。然后,您可以将第二个主题消费为 GlobalKTable
.
作为替代方案,您可以使用 "global store" 而不是 GlobalKTable
。对于这种情况,您可以提供自定义 Processor
,它可以在填充全局存储之前实现过滤器。参见 Defining a Stream Processor。
全球商店也是本地的。不同的是,对于"regular store"数据是分区的,即每个store包含不同的数据,而对于global store,每个实例加载所有数据(即数据被复制)。因此,组中的每个成员都有自己的全局存储数据副本。
我做了一个 "Streamer" 像这样将人转变为客户: 这是拓扑:
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
Topology topology = builder.build();
topology.addSource("person$", kafkaTopic)
.addProcessor("selection", PersonProcessor::new, "person$")
.addSink("customer$", customerTopic, "selection");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
这是处理器:
public class PersonProcessor extends AbstractProcessor<String, PersonAvro> {
Logger journal = LoggerFactory.getLogger(PersonProcessor.class);
@Override
public void process(String key, PersonAvro avroPerson) {
journal.debug("traitement objet: {}, {}", key, avroPerson.getActive());
Optional.ofNullable(avroPerson)
.filter(person -> Optional.ofNullable(person)
.map(PersonAvro::getActive)
.filter(activation -> !activation.matches("0"))
.isPresent())
.map(person -> CustomerAvro.newBuilder()
.setId(person.getId())
.setCompName(person.getCompName())
.setSiretCode(person.getSiretCode())
.setActive(person.getActive())
.setAdd3(person.getAdd3())
.setAdd4(person.getAdd4())
.setAdd5(person.getAdd5())
.setAdd6(person.getAdd6())
.setAdd7(person.getAdd7()))
.map(CustomerAvro.Builder::build)
.ifPresent(customer -> {
context().forward(key, customer);
context().commit();
});
}
}
另一个从 GlobalKTable 加载本地存储的流媒体
@PostConstruct
private void init() throws InterruptedException {
configurer();
journal.info("Open topic {}...", kafkaTopic);
StreamsBuilder builder = new StreamsBuilder();
builder.globalTable(kafkaTopic, Materialized.as("customerStore"));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
customerStore = waitUntilStoreIsQueryable("customerStore", streams);
Runtime.getRuntime()
.addShutdownHook(new Thread(streams::close));
}
并且能够响应同步请求:
public Optional<CustomerDto> getClient(int idCoclico) {
journal.debug(Markers.append("idCoclico", idCoclico), "Recherche d'un client COCLICO");
// Recherche du client dans le cache
Optional<CustomerDto> optClient = Optional.ofNullable(idCoclico)
.map(String::valueOf)
.map(customerStore::get)
.map(avroCustomer -> {
journal.debug(Markers.append("idCoclico", idCoclico),
"Le client existe dans le store local et n'est pas inactif");
CustomerDto client = new CustomerDto(avroCustomer.getId());
client.setCompName(avroCustomer.getCompName());
client.setSiretCode(avroCustomer.getSiretCode());
client.setAdd3(avroCustomer.getAdd3());
client.setAdd4(avroCustomer.getAdd4());
client.setAdd5(avroCustomer.getAdd5());
client.setAdd6(avroCustomer.getAdd6());
client.setAdd7(avroCustomer.getAdd7());
Optional<String> optAdd = Optional.ofNullable(avroCustomer.getAdd7())
.map(String::trim)
.filter(add -> !add.isEmpty());
// Si l'adresse est renseignée dans COCLICO
if (optAdd.isPresent())
client.setCountryCode(avroCustomer.getCountryCode());
// Les adresses Françaises ne sont pas renseignée
else
client.setCountryCode(fr.laposte.bscc.encaissement.Constantes.CODE_PAYS_FRANCE);
return client;
});
if (!optClient.isPresent())
journal.info(Markers.append("idCoclico", idCoclico), "Le client n'existe pas dans le store local");
return optClient;
}
第一次测试还是ok的。我将尝试在构建环境中部署它...