为什么在第一次处理元组后 httpcomponents 会减慢我的拓扑?
Why does httpcomponents slow down my topology after the first processing of tuples?
我已经构建了一个 Storm 拓扑,它通过 kafka-spout 从 Apache-Kafka 获取元组,将此数据(使用另一个螺栓)作为字符串写入我本地系统上的 .txt 文件并发送一个之后来自我的 PostBolt 的 httpPost。
两个螺栓都连接到 Kafka-Spout。
如果我在没有 PostBolt 的情况下测试拓扑,一切正常。但是,如果我将螺栓添加到拓扑结构中,由于某种原因整个拓扑结构都会被阻塞。
有没有人遇到同样的问题或者能给我一些提示,这是什么原因造成的?
我读到有一些 CloseableHttpClient 或 CloseableHttpResponse 阻止线程工作的问题...在这种情况下可能是同一个问题吗?
Code of my PostBolt:
public class PostBolt extends BaseRichBolt {
private CloseableHttpClient httpclient;
@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//empty for now
}
@Override
public final void execute(Tuple tuple) {
//create HttpClient:
httpclient = HttpClients.createDefault();
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);
post.setHeader("str1", "TEST TEST TEST");
try {
CloseableHttpResponse postResponse;
postResponse = httpclient.execute(post);
System.out.println(postResponse.getStatusLine());
System.out.println("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}}
Code of my Topology:
public static void main(String[] args) throws Exception {
/**
* create a config for Kafka-Spout (and Kafka-Bolt)
*/
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//setup zookeeper connection
String zkConnString = "localhost:2181";
//define Kafka topic for the spout
String topic = "mytopic";
//assign the zookeeper connection to brokerhosts
BrokerHosts hosts = new ZkHosts(zkConnString);
//setting up spout properties
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
/**
* Build the Topology by linking the spout and bolts together
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("post-bolt", new PostBolt()).shuffleGrouping("kafka-spout");
/**
* Check if we're running locally or on a real cluster
*/
if (args != null && args.length >0) {
config.setNumWorkers(6);
config.setNumAckers(6);
config.setMaxSpoutPending(100);
config.setMessageTimeoutSecs(20);
StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
config.setNumWorkers(6);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
//Utils.sleep(100000);
//cluster.killTopology("StormKafkaTopology");
//cluster.shutdown();
}
}}
在我看来你已经回答了你的问题但是是的...根据 this answer 你应该使用 PoolingHttpClientConnectionManager 因为你将 运行 在多线程环境中。
编辑:
public class PostBolt extends BaseRichBolt {
private static Logger LOG = LoggerFactory.getLogger(PostBolt.class);
private CloseableHttpClient httpclient;
private OutputCollector _collector;
@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
httpclient = HttpClients.createDefault();
_collector = collector;
}
@Override
public final void execute(Tuple tuple) {
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);
post.setHeader("str1", "TEST TEST TEST");
CloseableHttpResponse postResponse = httpclient.execute(post);
try {
LOG.info(postResponse.getStatusLine());
LOG.info("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
LOG.error("SolrIndexerBolt prepare error", e);
_collector.reportError(e);
} finally {
postResponse.close()
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}
}
好的,我根据这个评论确定了问题
Kafka Spout 将继续重新发送元组,这些元组未被发送到的 "endpoints" 确认。
所以我只需要确认螺栓内传入的元组,拓扑的问题就消失了。
(我发现了问题,因为 printerbolt 确实继续写入,即使没有来自 kafkaspout 的进一步输入)。
我已经构建了一个 Storm 拓扑,它通过 kafka-spout 从 Apache-Kafka 获取元组,将此数据(使用另一个螺栓)作为字符串写入我本地系统上的 .txt 文件并发送一个之后来自我的 PostBolt 的 httpPost。
两个螺栓都连接到 Kafka-Spout。
如果我在没有 PostBolt 的情况下测试拓扑,一切正常。但是,如果我将螺栓添加到拓扑结构中,由于某种原因整个拓扑结构都会被阻塞。
有没有人遇到同样的问题或者能给我一些提示,这是什么原因造成的?
我读到有一些 CloseableHttpClient 或 CloseableHttpResponse 阻止线程工作的问题...在这种情况下可能是同一个问题吗?
Code of my PostBolt:
public class PostBolt extends BaseRichBolt {
private CloseableHttpClient httpclient;
@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//empty for now
}
@Override
public final void execute(Tuple tuple) {
//create HttpClient:
httpclient = HttpClients.createDefault();
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);
post.setHeader("str1", "TEST TEST TEST");
try {
CloseableHttpResponse postResponse;
postResponse = httpclient.execute(post);
System.out.println(postResponse.getStatusLine());
System.out.println("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}}
Code of my Topology:
public static void main(String[] args) throws Exception {
/**
* create a config for Kafka-Spout (and Kafka-Bolt)
*/
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//setup zookeeper connection
String zkConnString = "localhost:2181";
//define Kafka topic for the spout
String topic = "mytopic";
//assign the zookeeper connection to brokerhosts
BrokerHosts hosts = new ZkHosts(zkConnString);
//setting up spout properties
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
/**
* Build the Topology by linking the spout and bolts together
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("post-bolt", new PostBolt()).shuffleGrouping("kafka-spout");
/**
* Check if we're running locally or on a real cluster
*/
if (args != null && args.length >0) {
config.setNumWorkers(6);
config.setNumAckers(6);
config.setMaxSpoutPending(100);
config.setMessageTimeoutSecs(20);
StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
config.setNumWorkers(6);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
//Utils.sleep(100000);
//cluster.killTopology("StormKafkaTopology");
//cluster.shutdown();
}
}}
在我看来你已经回答了你的问题但是是的...根据 this answer 你应该使用 PoolingHttpClientConnectionManager 因为你将 运行 在多线程环境中。
编辑:
public class PostBolt extends BaseRichBolt {
private static Logger LOG = LoggerFactory.getLogger(PostBolt.class);
private CloseableHttpClient httpclient;
private OutputCollector _collector;
@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
httpclient = HttpClients.createDefault();
_collector = collector;
}
@Override
public final void execute(Tuple tuple) {
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);
post.setHeader("str1", "TEST TEST TEST");
CloseableHttpResponse postResponse = httpclient.execute(post);
try {
LOG.info(postResponse.getStatusLine());
LOG.info("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
LOG.error("SolrIndexerBolt prepare error", e);
_collector.reportError(e);
} finally {
postResponse.close()
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}
}
好的,我根据这个评论确定了问题
Kafka Spout 将继续重新发送元组,这些元组未被发送到的 "endpoints" 确认。
所以我只需要确认螺栓内传入的元组,拓扑的问题就消失了。
(我发现了问题,因为 printerbolt 确实继续写入,即使没有来自 kafkaspout 的进一步输入)。