如何一次从一个 KafkaSpout 发送多个(不同的)元组到螺栓?
How to send multiple (different) tuples from one KafkaSpout at once to the bolt?
我是 Apache Storm 的新手。
我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。
为此,我有一个 KafkaSpout,它将向 Bolts(它有我的 CEP 查询)发送流来过滤流。
我已经创建了一个拓扑,我正在尝试 运行 在本地集群上
问题是我的螺栓中的 CEP 查询 运行ning 需要成批的元组才能对流执行 window 操作。在我的拓扑中,KafkaSpout 一次只发送一个元组给 Bolts 进行处理。所以我的 CEP 查询没有按预期工作。
我在 Storm 中使用默认的 KafkaSpout。有什么方法可以一次向 Bolt 发送多个不同的元组?一些配置调整可以做到这一点,还是我需要为此制作自定义 KafkaSpout?
请帮忙!!
我的拓扑:
TopologyBuilder 生成器 = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG , "weather-consumer-group").build()),4);
builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");
builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")
我正在使用 2 个螺栓和一个喷嘴。
我在 Bolt A 中的 esper 查询 运行ning 是
select first(e), last(e) from weatherEvent.win:length(3) as e
这里我试图从事件流中获取长度为 3 的 window 中的第一个和最后一个事件。但是我得到了相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。
spout 做不到,但你可以使用 Storm 的窗口支持https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只写一个聚合螺栓并将它放在 spout 和拓扑的其余部分之间。
所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection
。
我建议您尝试内置的窗口支持,但如果您更愿意编写自己的聚合,您的 bolt 实际上只需要接收一定数量的元组(例如 3),并发出一个新的元组包含所有值。
聚合器螺栓应该做类似
的事情
private List<Tuple> buffered;
execute(Tuple input) {
if (buffered.size != 2) {
buffered.add(input)
return
}
Tuple first = buffered.get(0)
Tuple second = buffered.get(1)
Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
List<Tuple> anchors = List.of(first, second, input)
collector.emit(anchors, aggregate)
collector.ack(first, second, input)
buffered.clear()
}
这样你最终得到一个包含 3 个输入元组内容的元组。
我是 Apache Storm 的新手。
我正在尝试使用 Apache Kafka、Storm 和 ESPER CEP 引擎开发一个实时流处理系统。
为此,我有一个 KafkaSpout,它将向 Bolts(它有我的 CEP 查询)发送流来过滤流。
我已经创建了一个拓扑,我正在尝试 运行 在本地集群上
问题是我的螺栓中的 CEP 查询 运行ning 需要成批的元组才能对流执行 window 操作。在我的拓扑中,KafkaSpout 一次只发送一个元组给 Bolts 进行处理。所以我的 CEP 查询没有按预期工作。
我在 Storm 中使用默认的 KafkaSpout。有什么方法可以一次向 Bolt 发送多个不同的元组?一些配置调整可以做到这一点,还是我需要为此制作自定义 KafkaSpout?
请帮忙!!
我的拓扑:
TopologyBuilder 生成器 = new TopologyBuilder();
builder.setSpout("KafkaSpout", new KafkaSpout<>(KafkaSpoutConfig.builder("localhost:" + 9092, "weatherdata").setProp(ConsumerConfig.GROUP_ID_CONFIG , "weather-consumer-group").build()),4);
builder.setBolt("A", new FeatureSelectionBolt(), 2).globalGrouping("KafkaSpout");
builder.setBolt("B", new TrendDetectionBolt(), 2).shuffleGrouping("A")
我正在使用 2 个螺栓和一个喷嘴。
我在 Bolt A 中的 esper 查询 运行ning 是
select first(e), last(e) from weatherEvent.win:length(3) as e
这里我试图从事件流中获取长度为 3 的 window 中的第一个和最后一个事件。但是我得到了相同的第一个和最后一个事件,因为 KafkaSpout 一次只发送一个元组。
spout 做不到,但你可以使用 Storm 的窗口支持https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html,或者只写一个聚合螺栓并将它放在 spout 和拓扑的其余部分之间。
所以你的拓扑应该是spout -> aggregator -> feature selection -> trend detection
。
我建议您尝试内置的窗口支持,但如果您更愿意编写自己的聚合,您的 bolt 实际上只需要接收一定数量的元组(例如 3),并发出一个新的元组包含所有值。
聚合器螺栓应该做类似
的事情private List<Tuple> buffered;
execute(Tuple input) {
if (buffered.size != 2) {
buffered.add(input)
return
}
Tuple first = buffered.get(0)
Tuple second = buffered.get(1)
Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues())
List<Tuple> anchors = List.of(first, second, input)
collector.emit(anchors, aggregate)
collector.ack(first, second, input)
buffered.clear()
}
这样你最终得到一个包含 3 个输入元组内容的元组。