为 JSON 创建 kafka 流 API
Creating kafka stream API for JSON's
我正在尝试编写用于将 JSON 数组转换为 JSON 元素的 kafka 流代码...因为我是 kafka 流的新手,任何人都可以帮助我编写代码..就像 kstream 和 ktable 中应该有的一样..
我的输入流将采用以下格式
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
]
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
]
并且我的输出必须采用
形式
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
谁能帮我写代码??
在Python...
from kafka import KafkaConsumer
consumer = KafkaConsumer('topicName')
for message in consumer:
print(message)
在 KafkaConsumer 中指定 bootstrap_servers 参数。
为Java看cloudkarafka,真不错:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("msg = %s\n", record.value());
}
}
如果你想使用Kafka Streams,你可以使用flatMap()
。像
// using new 1.0 API
StreamsBuilder builder = new StreamsBuilder();
builer.stream("topic").flatMap(...).to("output-topic");
查看示例和文档了解更多详情:
我正在尝试编写用于将 JSON 数组转换为 JSON 元素的 kafka 流代码...因为我是 kafka 流的新手,任何人都可以帮助我编写代码..就像 kstream 和 ktable 中应该有的一样.. 我的输入流将采用以下格式
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
]
[
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""},
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
]
并且我的输出必须采用
形式{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":0,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":1,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359126933+05:30","data":2,"unit":""}
{"timestamp":"2017-10-24T12:44:09.359175426+05:30","data":3,"unit":""}
谁能帮我写代码??
在Python...
from kafka import KafkaConsumer
consumer = KafkaConsumer('topicName')
for message in consumer:
print(message)
在 KafkaConsumer 中指定 bootstrap_servers 参数。
为Java看cloudkarafka,真不错:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("msg = %s\n", record.value());
}
}
如果你想使用Kafka Streams,你可以使用flatMap()
。像
// using new 1.0 API
StreamsBuilder builder = new StreamsBuilder();
builer.stream("topic").flatMap(...).to("output-topic");
查看示例和文档了解更多详情: