KSQL一致性
KSQL consistency
我正在使用 dotnet 和 ksql 进行 PoC。
https://github.com/pablocastilla/kafkiano/
总体思路是看我能不能用KSQL来实现业务逻辑。在示例中,我介绍了库存中的设备并从中下订单。这个例子包括:
两大主流:
- 库存流接收库存添加事件。
- 订单流接收产品订单。
我用这些流创建了两个 tables:
- ProductStock:它只是将产品添加到库存中
- 订单:按产品统计订单
在这两个 table 之后,我创建了另一个 table,其中包含订单和库存产品之间的差异,只是为了知道是否还有剩余产品。
通过加入最后一个 table 和订单流,我可以在处理该订单时留下库存。
我正在介绍使用产品名称作为关键字的事件。到目前为止它在我的机器上运行良好,但我的问题是:
这在大型生产环境中是否一致?我想知道当并行接收大量事件时一致性何时被破坏的限制。
我如何知道哪些查询先于其他查询执行?在将差异加入订单流之前,我需要计算库存和订单之间的差异
谢谢
KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;
// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from ORDERSCREATEDSTREAM group by ProductName;
// join with the difference
CREATE TABLE StockByProductTable AS SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;
//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;
我复制并粘贴了从 confluent 团队得到的github answer:
“我明白你的问题了。对你的问题的一个最低限度的回答是,一旦你的消息在流中可用,它就会执行。
一个很好的类比是一台总是 运行 运转的机器。
每当有效负载进入内部时,它都会对其进行处理。现在轮到你了。您是否在处理后将一些有效负载插入到新的记录流中?那么是的,你可以称它为'chaining'。一旦你 运行 / 执行 CTAS/CSAS 语句你会看到类似 'Table/Stream created and Running' 的东西,这正是它的意思。
你已经点燃了一个总是 运行ning 查询!"
我正在使用 dotnet 和 ksql 进行 PoC。 https://github.com/pablocastilla/kafkiano/
总体思路是看我能不能用KSQL来实现业务逻辑。在示例中,我介绍了库存中的设备并从中下订单。这个例子包括:
两大主流:
- 库存流接收库存添加事件。
- 订单流接收产品订单。
我用这些流创建了两个 tables:
- ProductStock:它只是将产品添加到库存中
- 订单:按产品统计订单
在这两个 table 之后,我创建了另一个 table,其中包含订单和库存产品之间的差异,只是为了知道是否还有剩余产品。
通过加入最后一个 table 和订单流,我可以在处理该订单时留下库存。
我正在介绍使用产品名称作为关键字的事件。到目前为止它在我的机器上运行良好,但我的问题是:
这在大型生产环境中是否一致?我想知道当并行接收大量事件时一致性何时被破坏的限制。
我如何知道哪些查询先于其他查询执行?在将差异加入订单流之前,我需要计算库存和订单之间的差异
谢谢
KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;
// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from ORDERSCREATEDSTREAM group by ProductName;
// join with the difference
CREATE TABLE StockByProductTable AS SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;
//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;
我复制并粘贴了从 confluent 团队得到的github answer:
“我明白你的问题了。对你的问题的一个最低限度的回答是,一旦你的消息在流中可用,它就会执行。
一个很好的类比是一台总是 运行 运转的机器。 每当有效负载进入内部时,它都会对其进行处理。现在轮到你了。您是否在处理后将一些有效负载插入到新的记录流中?那么是的,你可以称它为'chaining'。一旦你 运行 / 执行 CTAS/CSAS 语句你会看到类似 'Table/Stream created and Running' 的东西,这正是它的意思。
你已经点燃了一个总是 运行ning 查询!"