(Kafka) 喷出风暴返回事物列表,无法将此列表传播到螺栓
(Kafka) Spout on storm returning a list of things, can't spread this list to bolts
我是 Storm 的新手,所以请多多包涵。
我有一个服务,每次我们用结果列表.
调用时,它会向kafka发送一条消息
有一个KafkaSpout读取每条消息,这条消息包含前面提到的列表。
只是一个JSon,我可以打开它。现在,问题来了:
我在方案中使用 jackson 执行此转换操作,但此方案可以 return 基本上是一个 Values
对象,它不是对象列表,而是字段值对的列表。
另一件事可能是在 Bolt 中获取此值(它只是一个扩展的 ArrayList)对象并将其解包为单个元素并将它们中的每一个发送到下一个 Bolt。这是一个解决方案吗?我可以通过对 Bolt 的一次调用发出多个对象吗?
有没有更聪明的解决方案?
是的,您可以从单个 Bolt 发出多个元组。查看位于 here
的示例螺栓中的执行方法
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); //emits multiple tuples
}
_collector.ack(tuple);
}
如您所见,for 循环可以同时发出多个元组。这样做会创建一个更大的消息树。这可能会导致问题,具体取决于您的可靠性保证和数据大小。
根据我的经验,hard/impossible 操作 KafkaSpout 中的数据。关于您的设置的一些注意事项。
- 我要做的第一件事是更改服务发送给 Kafka 的内容。你能把它放在单独的项目而不是一个大项目中发送吗?如果您无法更改它。
- 像您提到的那样设置多个螺栓,
Spout => UnwrapBolt => ProcessBolt
,其中 UnwrapBolt
获取您的一个数据源并将它们作为单独的元组发出,然后 ProcessBolt
获取每个单独的元组并处理随心所欲。
我是 Storm 的新手,所以请多多包涵。
我有一个服务,每次我们用结果列表.
调用时,它会向kafka发送一条消息有一个KafkaSpout读取每条消息,这条消息包含前面提到的列表。
只是一个JSon,我可以打开它。现在,问题来了:
我在方案中使用 jackson 执行此转换操作,但此方案可以 return 基本上是一个 Values
对象,它不是对象列表,而是字段值对的列表。
另一件事可能是在 Bolt 中获取此值(它只是一个扩展的 ArrayList)对象并将其解包为单个元素并将它们中的每一个发送到下一个 Bolt。这是一个解决方案吗?我可以通过对 Bolt 的一次调用发出多个对象吗?
有没有更聪明的解决方案?
是的,您可以从单个 Bolt 发出多个元组。查看位于 here
的示例螺栓中的执行方法public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); //emits multiple tuples
}
_collector.ack(tuple);
}
如您所见,for 循环可以同时发出多个元组。这样做会创建一个更大的消息树。这可能会导致问题,具体取决于您的可靠性保证和数据大小。
根据我的经验,hard/impossible 操作 KafkaSpout 中的数据。关于您的设置的一些注意事项。
- 我要做的第一件事是更改服务发送给 Kafka 的内容。你能把它放在单独的项目而不是一个大项目中发送吗?如果您无法更改它。
- 像您提到的那样设置多个螺栓,
Spout => UnwrapBolt => ProcessBolt
,其中UnwrapBolt
获取您的一个数据源并将它们作为单独的元组发出,然后ProcessBolt
获取每个单独的元组并处理随心所欲。