Spring 云函数函数接口return success/failure 处理

Spring cloud function Function interface return success/failure handling

我目前有一个spring云流应用,它有一个监听器功能,主要监听某个主题并依次执行以下内容:

  1. 消费来自主题的消息
  2. 在数据库中存储消费消息
  3. 调用外部服务获取一些信息
  4. 处理数据
  5. 在数据库中记录结果
  6. 将消息发送到另一个主题
  7. 确认消息(我将确认模式设置为手动)

我们决定转向 Spring 云功能,我已经能够使用 Function 界面完成上述几乎所有步骤,将源主题作为输入,将主题作为输出。

@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
    return message -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        return MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
    }
}

我的问题与步骤 7 中的异常处理有关(确认消息)。只有当我们确定消息已成功发送到接收器队列时,我们才会确认该消息,否则我们不会确认该消息。

我的问题是,如何在Spring云函数中实现这样的东西,特别是发送方法完全依赖于Spring框架(作为函数接口实现的结果评价)。

早些时候,我们可以通过 try/catch

@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
    try {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message);
        
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
    }catch (Exception exception){
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    }
}

是否有Function接口成功返回后触发的监听器,类似KafkaSendCallback但没有指定模板

Spring 云流对功能一无所知。它只是与以前相同的消息处理程序,因此与您之前使用的回调方法相同的方法也适用于函数。所以也许你可以分享一些代码来澄清你的意思?我也不明白 ..send 方法完全依赖于 Spring 框架是什么意思..

基于 Oleg 上面提到的内容,如果您想严格恢复 StreamListener 代码中的行为,您可以尝试以下操作。您可以切换到消费者,然后使用 KafkaTemplate 像以前一样在出站发送,而不是使用函数。

@Bean
public Consumer<Message<NotificationMessage>> validatedProducts() {
return message -> {
  try{
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback. 
       //only ack if the data was sent successfully. 
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        
  }
  catch (Exception exception){
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    }
  };

}

另一件值得研究的事情是使用 Kafka 事务,在这种情况下,如果它不能端到端工作,则不会发生确认。 Spring Cloud Stream binder 基于 Spring 中针对 Apache Kafka 的基础对此提供支持。更多详细信息 here. Here 是关于此的 Spring Cloud Stream 文档。

好吧,所以我选择的实际上是不为此使用 KafkaTemplate(或 streamBridge)。虽然这是一个可行的解决方案,但这意味着我的 Function 将被拆分为 Consumer 和某种即兴提供的(在本例中为 KafkaTemplate)。

因为我想坚持功能接口的设计目标,所以我在 ProducerListener 接口实现中隔离了数据库更新的行为

@Configuration
public class ProducerListenerConfiguration {
    private final MongoTemplate mongoTemplate;

    public ProducerListenerConfiguration(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @Bean
    public ProducerListener myProducerListener() {
        return new ProducerListener() {
            @SneakyThrows
            @Override
            public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                final ValidatedEvent event = new ObjectMapper().readerFor(ValidatedEvent.class).readValue((byte[]) producerRecord.value());
                final var updateResult = updateDocumentProcessedState(event.getKey(), event.getPayload().getVersion(), true);
            }

            @SneakyThrows
            @Override
            public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
                ProducerListener.super.onError(producerRecord, recordMetadata, exception);
            }
        };
    }

    public UpdateResult updateDocumentProcessedState(String id, long version, boolean isProcessed) {
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        Update update = new Update();
        update.set("processed", isProcessed);
        update.set("version", version);
        return mongoTemplate.updateFirst(query, update, ProductChangedEntity.class);
    }
}

然后每次成功尝试,数据库都会更新处理结果和更新后的版本号。