使用 Tika 进行 Storm Crawler 配置以进行递归爬网

Storm Crawler Configuration with Tika for recursive crawls

我想在我的拓扑结构中包含 tika 解析器。我在配置中将 jsoup.treat.non.html.as.error 设置为 false,并且按照风暴爬虫文档中的描述设置了 tika 拓扑。

抓取拓扑设置如下:

builder.setSpout("spout", new MemorySpout(testURLs));

builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout");

builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key"));

builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch");

builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap");

builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup");

builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika");

builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt")
                    .localOrShuffleGrouping("tika");

builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName)
                    .localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
                    .localOrShuffleGrouping("shunt", Constants.StatusStreamName)
                    .localOrShuffleGrouping("tika", Constants.StatusStreamName)
                    .localOrShuffleGrouping("indexer", Constants.StatusStreamName);

return submit("crawl", conf, builder);

使用此拓扑,我收到了无效拓扑异常。问题似乎是由状态螺栓引起的。因为,当我排除状态螺栓时,我的爬网拓扑可以正常工作。我应该如何配置状态螺栓?

第一个连接缺少 'fetch',您应该连接 'jsoup' 而不是 'shunt',后者不会向状态流发出外链:它只是发送 JSoup 无法处理的元组供 Tika 使用的特定流。请参阅 StatusStream wiki 了解一些背景信息。

下面的定义应该有效。

builder.setBolt("status", new MemoryStatusUpdater()).
    .localOrShuffleGrouping("fetch", Constants.StatusStreamName)
    .localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
    .localOrShuffleGrouping("jsoup", Constants.StatusStreamName)
    .localOrShuffleGrouping("tika", Constants.StatusStreamName)
    .localOrShuffleGrouping("indexer", Constants.StatusStreamName);

这是假设您的 HBaseIndexer 扩展了 AbstractIndexerBolt 并将元组发送到状态流。

请注意,MemoryStatusUpdater 主要用于测试和调试:如果您有多个 worker,它不一定会工作,并且如果重新启动 worker 进程,则拓扑会丢失其数据。