KSQL一致性

KSQL consistency

我正在使用 dotnet 和 ksql 进行 PoC。 https://github.com/pablocastilla/kafkiano/

总体思路是看我能不能用KSQL来实现业务逻辑。在示例中,我介绍了库存中的设备并从中下订单。这个例子包括:

两大主流:

我用这些流创建了两个 tables:

在这两个 table 之后,我创建了另一个 table,其中包含订单和库存产品之间的差异,只是为了知道是否还有剩余产品。

通过加入最后一个 table 和订单流,我可以在处理该订单时留下库存。

我正在介绍使用产品名称作为关键字的事件。到目前为止它在我的机器上运行良好,但我的问题是:

谢谢

KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');

//TABLE GROUPING BY PRODUCT
CREATE TABLE  ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;

// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from  ORDERSCREATEDSTREAM group by ProductName;

// join with the difference
CREATE TABLE StockByProductTable  AS  SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;

//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN  StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;

我复制并粘贴了从 confluent 团队得到的github answer

“我明白你的问题了。对你的问题的一个最低限度的回答是,一旦你的消息在流中可用,它就会执行。

一个很好的类比是一台总是 运行 运转的机器。 每当有效负载进入内部时,它都会对其进行处理。现在轮到你了。您是否在处理后将一些有效负载插入到新的记录流中?那么是的,你可以称它为'chaining'。一旦你 运行 / 执行 CTAS/CSAS 语句你会看到类似 'Table/Stream created and Running' 的东西,这正是它的意思。

你已经点燃了一个总是 运行ning 查询!"