Kafka 流 - KSQL - 拆分消息并发布到另一个主题

Kafka streams - KSQL - Split messages and publish to another topic

有没有办法使用 KSQL 将一条消息拆分为多条消息并发布到新主题。需要说明的是,我不是在寻找基于 Java 的侦听器并将其 iterate/stream 用于新主题;相反,我正在寻找可以为我做这件事的 KSQL。

例如:

比方说,我需要将 invoice 主题中的消息拆分为 item_inventory_delta 条消息

发票主题

key: saleschecknumber

消息示例:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta话题

: saleschecknumber_itemID

消息示例

1.

{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}

对于 KStream 应用程序,您可以使用 flatMap,它接受接受记录的函数和 returns 零个或多个记录的迭代:

case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)

// initialize the stream 
val inputStream: KStream[String, Record] = ??? 

// split the message
inputStream.flatMap { case (key, record) => 
  record.items.map(item => (key, item) )
}

根据我的理解,有很多处理方法与我们处理传入消息的方式有关,而不是聚合消息。使用 Kafka 流处理器的简单方法 API 允许您自定义处理逻辑。

Kafka Stream Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

注意:您不必定义输出值,所以我只是发布相同的键和值,但您可以选择定义输出键和值

你可以定义Kafka流处理器API如下

Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")

以下是自定义处理器方法,请注意它只是一种方法,不是完整的实现

class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();

        //constructor
        .......
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }

        @Override
        public void process(String key, String value) {
            try {

                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);
                
                
                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
                
                }
                //
                
                
                }


        }

    }  

从 ksqlDB 0.6 开始,你现在可以做到这一点,感谢 the addition of the EXPLODE table function

给定一个主题 invoice,根据您的示例,负载为 JSON,首先使用 PRINT 检查主题以转储其内容:

ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}

然后在主题的主题上声明一个模式,这给了我们一个 ksqlDB stream:

CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');

这只是 "registers" 与 ksqlDB 一起使用的现有主题。在下一步之前,不会写入新的 Kafka 主题。

创建一个新的 Kafka 主题,从到达源流的消息中不断填充:

CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;

已创建新主题:

ksql> SHOW TOPICS;

 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1

主题有请求的增量消息:)

ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}