如何正确地用quarkus和smallrye做事件驱动的微服务

How to do Event-Driven Microservices with quarkus and smallrye correctly

亲爱的, 我正在尝试做某种事件驱动的微服务。目前,我能够使用来自 Kafka 的消息并在使用 Quarkus & Smallrye-Reactive 消息传递扩展收到消息时更新数据库记录。我想进一步实现的是能够在成功的情况下向其他主题发送消息,否则向错误主题发送消息。我知道我们可以使用 return 和 @outgoing 注释来发出新消息,但我认为它不适合我的用例。如果在使用消息时发生错误,我在这里需要指导。我应该 return 向原始主题发送消息(通过不确认消息)还是应该使用它并向不同主题生成错误消息以回滚原始事务。

这是我的代码:

@Incoming("new-payment")
public void newMessage(String msg) {
    LOG.info("New payment has been received.");
    LOG.info("Payload is {}", msg);
    PaymentEvent pe = jsob.fromJson(msg, PaymentEvent.class);
    mysqlPool.preparedQuery("select totalBuyers from Book where isbn = ? ",
                    Tuple.of(pe.getIsbn()))
            .thenApply(rs -> {
                RowIterator<Row> iterator = rs.iterator();
                if (iterator.hasNext()) {
                    return iterator.next().getInteger(0) + 1;
                } else {
                    return Integer.valueOf(0);
                }
            })
            .thenApply(totalCount -> {
                return mysqlPool.preparedQuery("update Book set totalBuyers = ?",
                        Tuple.of(totalCount));
            })
            .whenComplete((rs, err) -> {
                if (err != null) {
                    //Emit an error to error topic.
                } else {
                    //Emit a msg to other service.
                }
            });
}

另外,如果你有更好的代码请提交,我在反应式编程方面还是新手:)。

多年来我一直从事企业集成工作,我认为您会希望两者兼顾。

Should I return message to the original topic (by not acknowledging the message) or should I consume it and produce error message to different topic to rollback the original transaction.

该事件应保留在主题上,以供另一个实例可能提取和处理。错误消息应作为事件记录。也许同一个消费者可以成功地接收并重新处理事件。

EDA(事件驱动架构)可能会提供不同的方法来处理此问题,但在 ESB 上,消息将被标记为已尝试。通常三次尝试会将其发送到死信队列,以便稍后可以更正和重新处理。

我们的企业也开始使用 EDA 设计和构建应用程序,所以我有兴趣阅读其他人对这个问题的看法。感谢您专注于 Quarkus。我相信这是我见过的来自 Redhat 的最好的技术之一!

这种方法的另一个问题是你正在做“1 项服务中的 2 次写入”,例如一个调用数据库,另一个调用主题。当 2 次写入之一失败时,这可能会成为问题。

如果您想避免这种情况并使用纯事件驱动的方法,那么您需要重新排序您的事件,以便写入数据库是整个流程中的最后一个事件,这样您就可以防止 2 次写入来自 1 项服务。

因此在您的情况下:将第二个 thenApply(..) 方法从更新数据库更改为将新事件触发到另一个主题。这个新主题的消费者应该进行数据库更新。于是流程变成了这样:

生产者 -> 主题 1 -> 消费者(select 来自...)并将事件触发到另一个主题 -> 主题 2 -> 消费者(更新 table)。