使用 KafkaStreams 处理主题时如何从主题中获取上一条消息
How to obtain the previous message from a topic when processing it with KafkaStreams
我一直在使用 covid19api 保存的数据实施 Kafka producers/consumers 和流。
我正在尝试从例如端点 https://api.covid19api.com/all 中提取每天的每日案例。然而,这项服务——以及来自这个 API 的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和康复病例),但不是累积的,而是最终的每日病例,我正在努力实现的目标。
使用 transformValues 和 StoreBuilder(推荐 )对我也不起作用,因为场景不同。我使用 transformValue 功能实现了一些不同的东西,但每次 检索到的 previous 值都是主题的第一个值,而不是实际的 previous:
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
} else {
state.put(key, value);
}
return value;
}
¿当我使用流处理主题时,如何从主题中获取上一条消息?任何帮助或建议将不胜感激。
此致。
问题不只是因为您只是将每个键获得的第一个值存储在状态存储中吗?如果在每条后续消息上您总是想要上一条消息,那么您需要始终将当前消息存储在状态存储中作为最后一步,例如:
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
}
// Always update the state store:
state.put(key, value);
return value;
}
我一直在使用 covid19api 保存的数据实施 Kafka producers/consumers 和流。
我正在尝试从例如端点 https://api.covid19api.com/all 中提取每天的每日案例。然而,这项服务——以及来自这个 API 的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和康复病例),但不是累积的,而是最终的每日病例,我正在努力实现的目标。
使用 transformValues 和 StoreBuilder(推荐
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
} else {
state.put(key, value);
}
return value;
}
¿当我使用流处理主题时,如何从主题中获取上一条消息?任何帮助或建议将不胜感激。
此致。
问题不只是因为您只是将每个键获得的第一个值存储在状态存储中吗?如果在每条后续消息上您总是想要上一条消息,那么您需要始终将当前消息存储在状态存储中作为最后一步,例如:
@Override
public String transform(Long key, String value) {
String prevValue = state.get(key);
log.info("{} => {}", key, value) ;
if (prevValue != null) {
Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
String newDto = new Gson().toJson(dto);
log.info("New value {}", newDto);
return newDto;
}
// Always update the state store:
state.put(key, value);
return value;
}