风暴场分组

Storm fields grouping

我遇到了以下情况:

事实是,可视化螺栓总是相同的,但它会为可以作为其输入的每种类型的螺栓发送一条不同的 header 消息。例如:

我的问题是,我是否应该创建 3 个独立的实例,每个实例都有一个线程,例如

builder.setBolt("forSum", new VisualizationBolt(),1).globalGrouping("bolt-sum");
builder.setBolt("forDif", new VisualizationBolt(),1).globalGrouping("bolt-dif");
builder.setBolt("forMul", new VisualizationBolt(),1).globalGrouping("bolt-mul");

或者我应该执行以下操作

builder.setBolt("forAll", new VisualizationBolt(),3)
.fieldsGrouping("forSum", new Fields("type"))
.fieldsGrouping("forDif", new Fields("type"))
.fieldsGrouping("forMul", new Fields("type"));

并从前面的每个螺栓发出类型,因此可以根据它对它们进行分组?

有什么优势?

此外,我是否应该期望每次 bolt-sum 都会转到第一个可视化螺栓,bolt-dif 会转到第二个可视化螺栓,而 bolt-mul 会转到第三个可视化螺栓?不会混的?

我认为应该是这样,但目前我的实现中没有,所以我不确定这是一个错误还是我遗漏了什么?

第一种使用三个实例的方法是正确的方法。使用 fieldsGrouping 确实 而不是 确保 "sum" 值转到 "Sum-Visualization-Bolt" 并且 sum/diff/mul 值也不是不同的(即,在不同的螺栓实例)。

fieldGrouping 的语义更宽松:它只保证所有相同类型的元组将由单个 bolt 实例处理,即永远不会是这样,两个不同的螺栓实例获得相同的类型。

我猜你可以使用部分键分组 (partialKeyGrouping)。在 Storm documentation about stream groups 上说:

Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.

我使用这个分组实现了一个简单的拓扑,与 fieldsGrouping 相比,Graphite 服务器上的图表显示了更好的负载平衡。完整的源代码是 here.

topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TYPE.getValue(), new SensorAggregateValuesWindowBolt().withTumblingWindow(Duration.seconds(5)), 2)
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        // .fieldsGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_01.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .partialKeyGrouping(MqttSensors.SPOUT_STATION_02.getValue(), new Fields(MqttSensors.FIELD_SENSOR_TYPE.getValue()))
        .setNumTasks(4) // This will create 4 Bolt instances 
        .addConfiguration(TagSite.SITE.getValue(), TagSite.EDGE.getValue())
        ;