gpt4 book ai didi

java - 为什么 httpcomponents 在第一次处理元组后会减慢我的拓扑速度?

转载 作者:行者123 更新时间:2023-12-01 09:09:03 26 4
gpt4 key购买 nike

我已经构建了一个 Storm 拓扑,它通过 kafka-spout 从 Apache-Kafka 接收元组,将此数据(使用另一个 Bolt)作为字符串写入本地系统上的 .txt 文件中,并从之后是我的 PostBolt。

两个 Bolt 都连接到 Kafka-Spout。

如果我在没有 PostBolt 的情况下测试拓扑,一切都会正常。但是如果我将 bolt 添加到拓扑中,整个拓扑就会由于某种原因被阻止。

有没有人遇到同样的问题或者可以给我提示,是什么原因造成的?

我读到,CloseableHttpClient 或 CloseableHttpResponse 存在一些阻止线程工作的问题...在这种情况下可能是同一问题吗?

<小时/>

Code of my PostBolt:

public class PostBolt extends BaseRichBolt {

private CloseableHttpClient httpclient;

@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//empty for now
}

@Override
public final void execute(Tuple tuple) {

//create HttpClient:
httpclient = HttpClients.createDefault();
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);

post.setHeader("str1", "TEST TEST TEST");

try {
CloseableHttpResponse postResponse;
postResponse = httpclient.execute(post);
System.out.println(postResponse.getStatusLine());
System.out.println("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}}

Code of my Topology:

public static void main(String[] args) throws Exception {

/**
* create a config for Kafka-Spout (and Kafka-Bolt)
*/
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//setup zookeeper connection
String zkConnString = "localhost:2181";
//define Kafka topic for the spout
String topic = "mytopic";
//assign the zookeeper connection to brokerhosts
BrokerHosts hosts = new ZkHosts(zkConnString);

//setting up spout properties
SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

/**
* Build the Topology by linking the spout and bolts together
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("post-bolt", new PostBolt()).shuffleGrouping("kafka-spout");

/**
* Check if we're running locally or on a real cluster
*/
if (args != null && args.length >0) {
config.setNumWorkers(6);
config.setNumAckers(6);
config.setMaxSpoutPending(100);
config.setMessageTimeoutSecs(20);
StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
config.setNumWorkers(6);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
//Utils.sleep(100000);
//cluster.killTopology("StormKafkaTopology");
//cluster.shutdown();
}
}}

最佳答案

在我看来,您已经回答了您的问题,但是是的...根据 this answer您应该使用 PoolingHttpClientConnectionManager,因为您将在多线程环境中运行。

编辑:

public class PostBolt extends BaseRichBolt {
private static Logger LOG = LoggerFactory.getLogger(PostBolt.class);
private CloseableHttpClient httpclient;
private OutputCollector _collector;

@Override
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
httpclient = HttpClients.createDefault();
_collector = collector;
}

@Override
public final void execute(Tuple tuple) {
String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
HttpPost post = new HttpPost(url);
post.setHeader("str1", "TEST TEST TEST");

CloseableHttpResponse postResponse = httpclient.execute(post);
try {
LOG.info(postResponse.getStatusLine());
LOG.info("=====sending POST=====");
HttpEntity postEntity = postResponse.getEntity();
//do something useful with the response body
//and ensure that it is fully consumed
EntityUtils.consume(postEntity);
postResponse.close();
}catch (Exception e){
LOG.error("SolrIndexerBolt prepare error", e);
_collector.reportError(e);
} finally {
postResponse.close()
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("HttpPost"));
}

}

关于java - 为什么 httpcomponents 在第一次处理元组后会减慢我的拓扑速度?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41037964/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com