使用 Tika 进行 Storm Crawler 配置以进行递归爬网
Storm Crawler Configuration with Tika for recursive crawls
我想在我的拓扑结构中包含 tika 解析器。我在配置中将 jsoup.treat.non.html.as.error
设置为 false
,并且按照风暴爬虫文档中的描述设置了 tika 拓扑。
抓取拓扑设置如下:
builder.setSpout("spout", new MemorySpout(testURLs));
builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout");
builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key"));
builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch");
builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap");
builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup");
builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika");
builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt")
.localOrShuffleGrouping("tika");
builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("shunt", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
return submit("crawl", conf, builder);
使用此拓扑,我收到了无效拓扑异常。问题似乎是由状态螺栓引起的。因为,当我排除状态螺栓时,我的爬网拓扑可以正常工作。我应该如何配置状态螺栓?
第一个连接缺少 'fetch',您应该连接 'jsoup' 而不是 'shunt',后者不会向状态流发出外链:它只是发送 JSoup 无法处理的元组供 Tika 使用的特定流。请参阅 StatusStream wiki 了解一些背景信息。
下面的定义应该有效。
builder.setBolt("status", new MemoryStatusUpdater()).
.localOrShuffleGrouping("fetch", Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("jsoup", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
这是假设您的 HBaseIndexer 扩展了 AbstractIndexerBolt 并将元组发送到状态流。
请注意,MemoryStatusUpdater 主要用于测试和调试:如果您有多个 worker,它不一定会工作,并且如果重新启动 worker 进程,则拓扑会丢失其数据。
我想在我的拓扑结构中包含 tika 解析器。我在配置中将 jsoup.treat.non.html.as.error
设置为 false
,并且按照风暴爬虫文档中的描述设置了 tika 拓扑。
抓取拓扑设置如下:
builder.setSpout("spout", new MemorySpout(testURLs));
builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout");
builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key"));
builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch");
builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap");
builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup");
builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika");
builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt")
.localOrShuffleGrouping("tika");
builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("shunt", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
return submit("crawl", conf, builder);
使用此拓扑,我收到了无效拓扑异常。问题似乎是由状态螺栓引起的。因为,当我排除状态螺栓时,我的爬网拓扑可以正常工作。我应该如何配置状态螺栓?
第一个连接缺少 'fetch',您应该连接 'jsoup' 而不是 'shunt',后者不会向状态流发出外链:它只是发送 JSoup 无法处理的元组供 Tika 使用的特定流。请参阅 StatusStream wiki 了解一些背景信息。
下面的定义应该有效。
builder.setBolt("status", new MemoryStatusUpdater()).
.localOrShuffleGrouping("fetch", Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("jsoup", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
这是假设您的 HBaseIndexer 扩展了 AbstractIndexerBolt 并将元组发送到状态流。
请注意,MemoryStatusUpdater 主要用于测试和调试:如果您有多个 worker,它不一定会工作,并且如果重新启动 worker 进程,则拓扑会丢失其数据。