为 JSON 创建 kafka 流 API

Creating kafka stream API for JSON's

我正在尝试编写用于将 JSON 数组转换为 JSON 元素的 kafka 流代码...因为我是 kafka 流的新手,任何人都可以帮助我编写代码..就像 kstream 和 ktable 中应该有的一样.. 我的输入流将采用以下格式

[
 {"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""},
 {"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
]

[
 {"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""},
 {"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
]

并且我的输出必须采用

形式
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}

谁能帮我写代码??

在Python...

from kafka import KafkaConsumer
consumer = KafkaConsumer('topicName')
for message in consumer:
  print(message)

在 KafkaConsumer 中指定 bootstrap_servers 参数。

为Java看cloudkarafka,真不错:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("msg = %s\n", record.value());
    }
}

如果你想使用Kafka Streams,你可以使用flatMap()。像

// using new 1.0 API
StreamsBuilder builder = new StreamsBuilder();
builer.stream("topic").flatMap(...).to("output-topic");

查看示例和文档了解更多详情: