当我将元组发送到 ElasticSearch 时,为什么我的风暴拓扑没有确认
Why are my storm topology not acking when i send tuple to ElasticSearch
我是 Storm 的新手,我刚开始参加数据架构师培训课程,正是在这种情况下,我遇到了今天带给您的问题。
我正在通过名为 CurrentPriceSpout 的 KafkaSpout 接收来自 kakfa 的消息。到目前为止,一切正常。然后,在我的 CurrentPriceBolt 中,我重新发布了一个元组,以便使用 EsCurrentPriceBolt 将我的数据写入 ElasticSearch。问题就在这里。我无法直接将数据写入 ElasticSearch,只有在删除拓扑时才会写入。
是否有 Storm 参数可以通过检索确认来强制写入元组?
我尝试通过添加选项“.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)”,元组在 ElasticSearch 中写得很好,但未被确认。所以 Storm 会无限期地重写它们。
感谢您的帮助
蒂埃里
我设法找到了问题的答案。
主要问题是 ES 的设计目的不是为了吸收研究项目中生成的那么少的数据。默认情况下,ES 以 1000 个条目为一组写入数据。在这个项目中,我每 30 秒生成一个数据,或者每 500 分钟(或 8h20)生成一批 1000 个数据。
所以我详细检查了我的拓扑配置并尝试了以下选项:
- es.batch.size.entries: 1
- es.storm.bolt.flush.entries.size: 1
- topology.producer.batch.size: 1
- topology.transfer.batch.size: 1
现在是这样的:
...
...
public class App
{
...
...
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
...
...
StormTopology topology = topologyBuilder.createTopology(); // je crée ma topologie Storm
String topologyName = properties.getProperty("storm.topology.name"); // je nomme ma topologie
StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology); // je démarre ma topologie sur mon cluster storm
System.out.println( "Topology on remote cluster : Started!" );
}
private static Config getTopologyConfig(Properties properties)
{
Config stormConfig = new Config();
stormConfig.put("topology.workers", Integer.parseInt(properties.getProperty("topology.workers")));
stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
stormConfig.put("topology.message.timeout.secs", Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
stormConfig.put("topology.transfer.batch.size", Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
stormConfig.put("topology.producer.batch.size", Integer.parseInt(properties.getProperty("topology.producer.batch.size")));
return stormConfig;
}
...
...
...
}
现在可以使用了!!!
我是 Storm 的新手,我刚开始参加数据架构师培训课程,正是在这种情况下,我遇到了今天带给您的问题。
我正在通过名为 CurrentPriceSpout 的 KafkaSpout 接收来自 kakfa 的消息。到目前为止,一切正常。然后,在我的 CurrentPriceBolt 中,我重新发布了一个元组,以便使用 EsCurrentPriceBolt 将我的数据写入 ElasticSearch。问题就在这里。我无法直接将数据写入 ElasticSearch,只有在删除拓扑时才会写入。
是否有 Storm 参数可以通过检索确认来强制写入元组?
我尝试通过添加选项“.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)”,元组在 ElasticSearch 中写得很好,但未被确认。所以 Storm 会无限期地重写它们。
感谢您的帮助 蒂埃里
我设法找到了问题的答案。 主要问题是 ES 的设计目的不是为了吸收研究项目中生成的那么少的数据。默认情况下,ES 以 1000 个条目为一组写入数据。在这个项目中,我每 30 秒生成一个数据,或者每 500 分钟(或 8h20)生成一批 1000 个数据。
所以我详细检查了我的拓扑配置并尝试了以下选项:
- es.batch.size.entries: 1
- es.storm.bolt.flush.entries.size: 1
- topology.producer.batch.size: 1
- topology.transfer.batch.size: 1
现在是这样的:
...
...
public class App
{
...
...
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
...
...
StormTopology topology = topologyBuilder.createTopology(); // je crée ma topologie Storm
String topologyName = properties.getProperty("storm.topology.name"); // je nomme ma topologie
StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology); // je démarre ma topologie sur mon cluster storm
System.out.println( "Topology on remote cluster : Started!" );
}
private static Config getTopologyConfig(Properties properties)
{
Config stormConfig = new Config();
stormConfig.put("topology.workers", Integer.parseInt(properties.getProperty("topology.workers")));
stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
stormConfig.put("topology.message.timeout.secs", Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
stormConfig.put("topology.transfer.batch.size", Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
stormConfig.put("topology.producer.batch.size", Integer.parseInt(properties.getProperty("topology.producer.batch.size")));
return stormConfig;
}
...
...
...
}
现在可以使用了!!!