风暴螺栓无法从喷口反序列化对象
Storm bolt can't deserialize object from spout
我正在使用 Storm 1.1.2、JDK 8(Storm 不喜欢 JDK 9 编译代码)、Kafka 0.11 和 Docker撰写。
我们的想法是拥有一个容器化服务,该服务可以接收 REST 调用以创建风暴拓扑,然后将它们提交到风暴集群。一切都在本地工作,但将拓扑提交从提交到本地集群移动到 StormSubmitter
会导致问题。我已经解决了其中的大部分问题,但是有一个奇怪的序列化问题。
我有一个从 Kafka 成功读取的 spout。它读取 Protobuf 对象的字节数组,并使用自定义反序列化器从中创建消息。我有两个不同的螺栓从这个 spout 读取,一个打印传入消息(螺栓 A),另一个根据字段过滤消息并将它们发送到另一个螺栓以进行聚合(螺栓 B)。
我注意到这两个螺栓之间的唯一区别是螺栓 B 有构造函数而螺栓 A 没有。
出于某种原因,bolt A 从 spout 接收消息并打印它们没有问题,但每次消息到达 bolt B 时都会抛出异常 com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): my.package.MyProtobufMessage
。我看到您可以为 类 注册序列化程序,但为什么螺栓 A 能够处理消息而螺栓 B 不能?
也是一个单独的问题,但是当我添加第三个拓扑时,nimbus 没有为其分配主管。一个拓扑将有 2 个工作人员和 9 个执行人员,第二个拓扑将有 2 个工作人员和 6 个执行人员,然后我将添加第三个拓扑,它将显示在 UI 和 Nimbus 日志中,但不会主管日志。在 UI 中,第三个拓扑将有 0 个 worker、exec 和 0 个分配的 mem
您可能 "lucky" 因为 spout 和 bolt A 在同一个 worker 中,而 bolt B 在另一个 worker 中。 Storm 不会序列化元组,除非它们被转移到另一个 worker,这可能是 bolt A 可以读取消息的原因。
关于第三个拓扑问题,你需要确保你的主管有足够的工作槽来满足你的拓扑。每个主管在 Storm 配置 (storm.yaml) 中定义它愿意 运行 的工作 JVM 数量。我猜前两个拓扑占用了所有插槽。
主管的默认配置是
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
允许每个主管有 4 个工作 JVM。由于不共享工作 JVM,如果您运行宁 2 个拓扑每个占用 2 个工作人员,那么您已经用完了所有插槽。您可以添加更多插槽或更多主管计算机,或者减少拓扑所需的工作人员数量。
我正在使用 Storm 1.1.2、JDK 8(Storm 不喜欢 JDK 9 编译代码)、Kafka 0.11 和 Docker撰写。
我们的想法是拥有一个容器化服务,该服务可以接收 REST 调用以创建风暴拓扑,然后将它们提交到风暴集群。一切都在本地工作,但将拓扑提交从提交到本地集群移动到 StormSubmitter
会导致问题。我已经解决了其中的大部分问题,但是有一个奇怪的序列化问题。
我有一个从 Kafka 成功读取的 spout。它读取 Protobuf 对象的字节数组,并使用自定义反序列化器从中创建消息。我有两个不同的螺栓从这个 spout 读取,一个打印传入消息(螺栓 A),另一个根据字段过滤消息并将它们发送到另一个螺栓以进行聚合(螺栓 B)。
我注意到这两个螺栓之间的唯一区别是螺栓 B 有构造函数而螺栓 A 没有。
出于某种原因,bolt A 从 spout 接收消息并打印它们没有问题,但每次消息到达 bolt B 时都会抛出异常 com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): my.package.MyProtobufMessage
。我看到您可以为 类 注册序列化程序,但为什么螺栓 A 能够处理消息而螺栓 B 不能?
也是一个单独的问题,但是当我添加第三个拓扑时,nimbus 没有为其分配主管。一个拓扑将有 2 个工作人员和 9 个执行人员,第二个拓扑将有 2 个工作人员和 6 个执行人员,然后我将添加第三个拓扑,它将显示在 UI 和 Nimbus 日志中,但不会主管日志。在 UI 中,第三个拓扑将有 0 个 worker、exec 和 0 个分配的 mem
您可能 "lucky" 因为 spout 和 bolt A 在同一个 worker 中,而 bolt B 在另一个 worker 中。 Storm 不会序列化元组,除非它们被转移到另一个 worker,这可能是 bolt A 可以读取消息的原因。
关于第三个拓扑问题,你需要确保你的主管有足够的工作槽来满足你的拓扑。每个主管在 Storm 配置 (storm.yaml) 中定义它愿意 运行 的工作 JVM 数量。我猜前两个拓扑占用了所有插槽。
主管的默认配置是
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
允许每个主管有 4 个工作 JVM。由于不共享工作 JVM,如果您运行宁 2 个拓扑每个占用 2 个工作人员,那么您已经用完了所有插槽。您可以添加更多插槽或更多主管计算机,或者减少拓扑所需的工作人员数量。