当我将元组发送到 ElasticSearch 时,为什么我的风暴拓扑没有确认

Why are my storm topology not acking when i send tuple to ElasticSearch

我是 Storm 的新手,我刚开始参加数据架构师培训课程,正是在这种情况下,我遇到了今天带给您的问题。

我正在通过名为 CurrentPriceSpout 的 KafkaSpout 接收来自 kakfa 的消息。到目前为止,一切正常。然后,在我的 CurrentPriceBolt 中,我重新发布了一个元组,以便使用 EsCurrentPriceBolt 将我的数据写入 ElasticSearch。问题就在这里。我无法直接将数据写入 ElasticSearch,只有在删除拓扑时才会写入。

是否有 Storm 参数可以通过检索确认来强制写入元组?

我尝试通过添加选项“.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)”,元组在 ElasticSearch 中写得很好,但未被确认。所以 Storm 会无限期地重写它们。

感谢您的帮助 蒂埃里

我设法找到了问题的答案。 主要问题是 ES 的设计目的不是为了吸收研究项目中生成的那么少的数据。默认情况下,ES 以 1000 个条目为一组写入数据。在这个项目中,我每 30 秒生成一个数据,或者每 500 分钟(或 8h20)生成一批 1000 个数据。

所以我详细检查了我的拓扑配置并尝试了以下选项:

  • es.batch.size.entries: 1
  • es.storm.bolt.flush.entries.size: 1
  • topology.producer.batch.size: 1
  • topology.transfer.batch.size: 1

现在是这样的:

...
...

public class App 
{
    ...    
    ...    

    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
    {
        ...
        ...

        StormTopology topology  = topologyBuilder.createTopology();                 // je crée ma topologie Storm
        String topologyName     = properties.getProperty("storm.topology.name");    // je nomme ma topologie
        StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology);               // je démarre ma topologie sur mon cluster storm
        System.out.println( "Topology on remote cluster : Started!" );              
    }


    private static Config getTopologyConfig(Properties properties)
    {
        Config stormConfig = new Config();
        stormConfig.put("topology.workers",                 Integer.parseInt(properties.getProperty("topology.workers")));
        stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
        stormConfig.put("topology.message.timeout.secs",    Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
        stormConfig.put("topology.transfer.batch.size",     Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
        stormConfig.put("topology.producer.batch.size",     Integer.parseInt(properties.getProperty("topology.producer.batch.size")));      
        return stormConfig;
    }

    ...    
    ...    
    ...    
}

现在可以使用了!!!