如何使用 Spring Cloud Stream Kafka 在微服务事件溯源架构中查询事件存储库
How to query the event repository in a microservice Event Sourcing architecture with Spring Cloud Stream Kafka
澄清:请注意,这个问题与这个问题不同:
这个是关于使用 Kafka as the only repository (of events), no DB needed, The other one is about using a Database (MariaDB) per service + Kafka。
我想实现一个事件溯源架构来处理分布式事务:
OrdersService <------------> | Kafka Event Store | <------------>PaymentsService
subscribe/ subscribe/
find find
OrdersService 接收订单请求并将新订单存储在代理中。
private OrderBusiness orderBusiness;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//do whatever
//Publish the new Order with state = pending
order.setState(PENDING);
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
这是我的主要疑问:如何查询 Kafka 代理?假设我想按 user/date、州等
搜索订单
简短回答: 您无法查询代理,但您可以利用 Kafka 的 Streams API 和 "Interactive Queries".
长答案: 读取 Kafka 主题的访问模式是线性扫描而不是随机查找。当然,您也可以通过#seek()
随时重新定位,但只能通过offset或time。此外,主题被分片成分区,数据(默认情况下)按键散列分区(数据模型是 key-value 对)。于是就有了key的概念。
不过,你可以使用Kafka的Streams API that allows you to build an app that hold the current state -- base on a Kafka topics that is the ground truth -- as a materialized view (basically a cache). "Interactive Queries"让你查询这个物化视图。
详情请看这两篇博文post:
澄清:请注意,这个问题与这个问题不同:
这个是关于使用 Kafka as the only repository (of events), no DB needed, The other one is about using a Database (MariaDB) per service + Kafka。
我想实现一个事件溯源架构来处理分布式事务:
OrdersService <------------> | Kafka Event Store | <------------>PaymentsService
subscribe/ subscribe/
find find
OrdersService 接收订单请求并将新订单存储在代理中。
private OrderBusiness orderBusiness;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//do whatever
//Publish the new Order with state = pending
order.setState(PENDING);
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
这是我的主要疑问:如何查询 Kafka 代理?假设我想按 user/date、州等
搜索订单简短回答: 您无法查询代理,但您可以利用 Kafka 的 Streams API 和 "Interactive Queries".
长答案: 读取 Kafka 主题的访问模式是线性扫描而不是随机查找。当然,您也可以通过#seek()
随时重新定位,但只能通过offset或time。此外,主题被分片成分区,数据(默认情况下)按键散列分区(数据模型是 key-value 对)。于是就有了key的概念。
不过,你可以使用Kafka的Streams API that allows you to build an app that hold the current state -- base on a Kafka topics that is the ground truth -- as a materialized view (basically a cache). "Interactive Queries"让你查询这个物化视图。
详情请看这两篇博文post: