gpt4 book ai didi

java - 无法在 Camel HTTP 组件中配置 "Keep Alive"

转载 作者:搜寻专家 更新时间:2023-11-01 03:47:57 26 4
gpt4 key购买 nike

我在正确设置 HTTP 组件时遇到了一些问题。目前,微服务从提供者那里提取 JSON 内容,对其进行处理并将其发送到下一个服务以进行进一步处理。主要问题是这个微服务创建了大量的 CLOSE_WAIT 套接字连接。我理解“KEEP-ALIVE”的整个概念应该保持连接打开直到我关闭它,但服务器可能会由于某些原因断开连接并创建此 CLOSE_WAIT 套接字。

我创建了一个用于调试/测试目的的小型服务,该服务向 Google 发出 GET 调用,但在我关闭程序之前,即使此连接也保持打开状态。我尝试了很多不同的解决方案:

  • .setHeader("Connection", constant("Close"))
  • -Dhttp.keepAlive=false 作为 VM 参数
  • 从 Camel-Http 切换到 Camel-Http4
  • httpClient.soTimeout=500(Camel-HTTP)、httpClient.socketTimeout=500 和 connectionTimeToLive=500(Camel-HTTP4)
  • .setHeader("Connection", simple("Keep-Alive")) 和.setHeader("Keep-Alive", simple("timeout=10")) (Camel-HTTP4)
  • 通过调试 DefaultConnectionKeepAliveStrategy 的响应从 -1(永无止境)到 Camel-HTTP4 中的特定值进行设置 - 这可行,但我无法注入(inject)自己的策略。

但是我没有成功。所以也许你们中的一个可以帮助我:

  • 我如何告诉 Camel-HTTP 它应该在特定时间过去后关闭连接?例如,该服务每小时从内容提供商那里拉取一次。 3-4 小时后,HttpComponent 应在拉取后关闭连接,并在下一次拉取时重新打开它。目前,每个连接都将放回 MultiThreadedHttpConnectionManager 并且套接字仍处于打开状态。
  • 如果使用 Camel-HTTP 无法做到这一点:我如何将 HttpClientBuilder 注入(inject)到我的路由创建中?我知道应该可以通过 httpClient 选项实现,但我不了解文档的特定部分。

谢谢大家的帮助

最佳答案

不幸的是,在应用程序最终关闭之前,没有一个建议的答案解决了我这边的 CLOSE_WAIT 连接状态。

我用下面的测试用例重现了这个问题:

public class HttpInvokationTest extends CamelSpringTestSupport {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@EndpointInject(uri = "mock:success")
private MockEndpoint successEndpoint;
@EndpointInject(uri = "mock:failure")
private MockEndpoint failureEndpoint;

@Override
protected AbstractApplicationContext createApplicationContext() {
return new AnnotationConfigApplicationContext(ContextConfig.class);
}

@Configuration
@Import(HttpClientSpringTestConfig.class)
public static class ContextConfig extends CamelConfiguration {

@Override
public List<RouteBuilder> routes() {
List<RouteBuilder> routes = new ArrayList<>(1);
routes.add(new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.log(LoggingLevel.INFO, LOG, CONFIDENTIAL, "Invoking external URL: ${header[ERPEL_URL]}")
.setHeader("Connection", constant("close"))
.recipientList(header("TEST_URL"))
.log(LoggingLevel.DEBUG, "HTTP response code: ${header["+Exchange.HTTP_RESPONSE_CODE+"]}")
.bean(CopyBodyToHeaders.class)
.choice()
.when(header(Exchange.HTTP_RESPONSE_CODE).isGreaterThanOrEqualTo(300))
.to("mock:failure")
.otherwise()
.to("mock:success");
}
});
return routes;
}
}

@Test
public void testHttpInvocation() throws Exception {
successEndpoint.expectedMessageCount(1);
failureEndpoint.expectedMessageCount(0);

ProducerTemplate template = context.createProducerTemplate();

template.sendBodyAndHeader("direct:start", null, "TEST_URL", "http4://meta.stackoverflow.com");

successEndpoint.assertIsSatisfied();
failureEndpoint.assertIsSatisfied();

Exchange exchange = successEndpoint.getExchanges().get(0);
Map<String, Object> headers = exchange.getIn().getHeaders();
String body = exchange.getIn().getBody(String.class);
for (String key : headers.keySet()) {
LOG.info("Header: {} -> {}", key, headers.get(key));
}
LOG.info("Body: {}", body);

Thread.sleep(120000);
}
}

并发出 netstat -ab -p tcp | grep 151.101.129.69 请求,其中 IP 是 meta.stackoverflow.com 之一。

这给出了如下响应:

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   ESTABLISHED      37562       2118
tcp4 0 0 192.168.0.10.52182 151.101.129.69.http ESTABLISHED 885 523

紧随其后

tcp4       0      0  192.168.0.10.52183     151.101.129.69.https   CLOSE_WAIT       37562       2118
tcp4 0 0 192.168.0.10.52182 151.101.129.69.http CLOSE_WAIT 885 523

响应直到应用程序由于 Connection: keep-alive header 而关闭,即使使用如下配置也是如此:

@Configuration
@EnableConfigurationProperties(HttpClientSettings.class)
public class HttpClientSpringTestConfig {

private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Resource
private HttpClientSettings httpClientSettings;

@Resource
private CamelContext camelContext;

private SocketConfig httpClientSocketConfig() {
/*
socket timeout:
Monitors the time passed between two consecutive incoming messages over the connection and
raises a SocketTimeoutException if no message was received within the given timeout interval
*/
LOG.info("Creating a SocketConfig with a socket timeout of {} seconds", httpClientSettings.getSoTimeout());
return SocketConfig.custom()
.setSoTimeout(httpClientSettings.getSoTimeout() * 1000)
.setSoKeepAlive(false)
.setSoReuseAddress(false)
.build();
}

private RequestConfig httpClientRequestConfig() {
/*
connection timeout:
The time span the application will wait for a connection to get established. If the connection
is not established within the given amount of time a ConnectionTimeoutException will be raised.
*/
LOG.info("Creating a RequestConfig with a socket timeout of {} seconds and a connection timeout of {} seconds",
httpClientSettings.getSoTimeout(), httpClientSettings.getConTimeout());
return RequestConfig.custom()
.setConnectTimeout(httpClientSettings.getConTimeout() * 1000)
.setSocketTimeout(httpClientSettings.getSoTimeout() * 1000)
.build();
}

@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
ConnectionKeepAliveStrategy myStrategy = new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
return 5 * 1000;
}
};

PoolingHttpClientConnectionManager conMgr =
new PoolingHttpClientConnectionManager();
conMgr.closeIdleConnections(5, TimeUnit.SECONDS);

return builder -> builder.setDefaultSocketConfig(httpClientSocketConfig())
.setDefaultRequestConfig(httpClientRequestConfig())
.setConnectionTimeToLive(5, TimeUnit.SECONDS)
.setKeepAliveStrategy(myStrategy)
.setConnectionManager(conMgr);
}

@PostConstruct
public void init() {
LOG.debug("Initializing HTTP clients");
HttpComponent httpComponent = camelContext.getComponent("http4", HttpComponent.class);
httpComponent.setHttpClientConfigurer(httpConfiguration());
HttpComponent httpsComponent = camelContext.getComponent("https4", HttpComponent.class);
httpsComponent.setHttpClientConfigurer(httpConfiguration());
}
}

或直接在各自的 HttpComponent 上定义设置。

在检查 HttpClient 代码中各自提出的方法时,很明显这些方法是单次操作,而不是 HttpClient 内部每隔几毫秒检查一次的配置。

PoolingHttpClientConnectionManager 进一步指出:

The handling of stale connections was changed in version 4.4. Previously, the code would check every connection by default before re-using it. The code now only checks the connection if the elapsed time since the last use of the connection exceeds the timeout that has been set. The default timeout is set to 2000ms

只有在尝试重新使用连接时才会发生,这对连接池很有意义,尤其是在通过同一连接交换多条消息的情况下。对于单次调用,它应该更像 Connection: close 可能在一段时间内不会重用该连接,保持连接打开或半关闭,因为没有进一步尝试从该连接中读取并因此认识到连接可以关闭。

我注意到我已经用传统的 HttpClients 解决了这个问题,并开始将这个解决方案移植到 Camel,这很容易解决。

该解决方案基本上包括使用服务注册 HttpClients,然后定期(在我的例子中为 5 秒)调用 closeExpiredConnections()closeIdleConnections(...)

这个逻辑保存在一个单例枚举中,因为这实际上是在几个应用程序使用的库中,每个应用程序都在自己的 JVM 中运行。

/**
* This singleton monitor will check every few seconds for idle and stale connections and perform
* a cleanup on the connections using the registered connection managers.
*/
public enum IdleConnectionMonitor {

INSTANCE;

private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/** The execution service which runs the cleanup every 5 seconds **/
private ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, new NamingThreadFactory());
/** The actual thread which performs the monitoring **/
private IdleConnectionMonitorThread monitorThread = new IdleConnectionMonitorThread();

IdleConnectionMonitor() {
// execute the thread every 5 seconds till the application is shutdown (or the shutdown method
// is invoked)
executorService.scheduleAtFixedRate(monitorThread, 5, 5, TimeUnit.SECONDS);
}

/**
* Registers a {@link HttpClientConnectionManager} to monitor for stale connections
*/
public void registerConnectionManager(HttpClientConnectionManager connMgr) {
monitorThread.registerConnectionManager(connMgr);
}

/**
* Request to stop the monitoring for stale HTTP connections.
*/
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(3, TimeUnit.SECONDS)) {
LOG.warn("Connection monitor shutdown not finished after 3 seconds!");
}
} catch (InterruptedException iEx) {
LOG.warn("Execution service was interrupted while waiting for graceful shutdown");
}
}

/**
* Upon invocation, the list of registered connection managers will be iterated through and if a
* referenced object is still reachable {@link HttpClientConnectionManager#closeExpiredConnections()}
* and {@link HttpClientConnectionManager#closeIdleConnections(long, TimeUnit)} will be invoked
* in order to cleanup stale connections.
* <p/>
* This runnable implementation holds a weakly referable list of {@link
* HttpClientConnectionManager} objects. If a connection manager is only reachable by {@link
* WeakReference}s or {@link PhantomReference}s it gets eligible for garbage collection and thus
* may return null values. If this is the case, the connection manager will be removed from the
* internal list of registered connection managers to monitor.
*/
private static class IdleConnectionMonitorThread implements Runnable {

// we store only weak-references to connection managers in the list, as the lifetime of the
// thread may extend the lifespan of a connection manager and thus allowing the garbage
// collector to collect unused objects as soon as possible
private List<WeakReference<HttpClientConnectionManager>> registeredConnectionManagers =
Collections.synchronizedList(new ArrayList<>());

@Override
public void run() {

LOG.trace("Executing connection cleanup");
Iterator<WeakReference<HttpClientConnectionManager>> conMgrs =
registeredConnectionManagers.iterator();
while (conMgrs.hasNext()) {
WeakReference<HttpClientConnectionManager> weakConMgr = conMgrs.next();
HttpClientConnectionManager conMgr = weakConMgr.get();
if (conMgr != null) {
LOG.trace("Found connection manager: {}", conMgr);
conMgr.closeExpiredConnections();
conMgr.closeIdleConnections(30, TimeUnit.SECONDS);
} else {
conMgrs.remove();
}
}
}

void registerConnectionManager(HttpClientConnectionManager connMgr) {
registeredConnectionManagers.add(new WeakReference<>(connMgr));
}
}

private static class NamingThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Connection Manager Monitor");
return t;
}
}
}

如前所述,此单例服务生成一个自己的线程,该线程每 5 秒调用上述两个方法。这些调用负责关闭在一定时间内未使用或在规定时间内处于空闲状态的连接。

为了 camel 化此服务,可以利用 EventNotifierSupport 让 Camel 在关闭时负责关闭监视器线程。

/**
* This Camel service with take care of the lifecycle management of {@link IdleConnectionMonitor}
* and invoke {@link IdleConnectionMonitor#shutdown()} once Camel is closing down in order to stop
* listening for stale connetions.
*/
public class IdleConnectionMonitorService extends EventNotifierSupport {

private final static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private IdleConnectionMonitor connectionMonitor;

@Override
public void notify(EventObject event) {
if (event instanceof CamelContextStartedEvent) {
LOG.info("Start listening for closable HTTP connections");
connectionMonitor = IdleConnectionMonitor.INSTANCE;
} else if (event instanceof CamelContextStoppingEvent){
LOG.info("Shutting down listener for open HTTP connections");
connectionMonitor.shutdown();
}
}

@Override
public boolean isEnabled(EventObject event) {
return event instanceof CamelContextStartedEvent || event instanceof CamelContextStoppingEvent;
}

public IdleConnectionMonitor getConnectionMonitor() {
return this.connectionMonitor;
}
}

为了利用该服务,HttpClient Camel 在内部使用的连接管理器需要向该服务注册,这在下面的代码块中完成:

private void registerHttpClientConnectionManager(HttpClientConnectionManager conMgr) {
if (!getIdleConnectionMonitorService().isPresent()) {
// register the service with Camel so that on a shutdown the monitoring thread will be stopped
camelContext.getManagementStrategy().addEventNotifier(new IdleConnectionMonitorService());
}
IdleConnectionMonitor.INSTANCE.registerConnectionManager(conMgr);
}

private Optional<IdleConnectionMonitorService> getIdleConnectionMonitorService() {
for (EventNotifier eventNotifier : camelContext.getManagementStrategy().getEventNotifiers()) {
if (eventNotifier instanceof IdleConnectionMonitorService) {
return Optional.of((IdleConnectionMonitorService) eventNotifier);
}
}
return Optional.empty();
}

最后但同样重要的是,在我的例子中,HttpClientSpringTestConfig 中的 httpConfiguration 中定义的连接管理器需要经过引入的注册函数

PoolingHttpClientConnectionManager conMgr = new PoolingHttpClientConnectionManager();
registerHttpClientConnectionManager(conMgr);

这可能不是最漂亮的解决方案,但它确实关闭了我机器上的半关闭连接。


@编辑

我刚刚了解到您可以使用 NoConnectionReuseStrategy 将连接状态更改为 TIME_WAIT 而不是 CLOSE_WAIT 并因此在短暂的一刻。不幸的是,发出的请求仍然带有 Connection: keep-alive header 。此策略将为每个请求创建一个新连接,即如果您有一个 301 Moved Permanently 重定向响应,则重定向将发生在一个新连接上。

httpClientConfigurer bean 需要更改为以下内容才能使用上述策略:

@Bean(name = "httpClientConfigurer")
public HttpClientConfigurer httpConfiguration() {
return builder -> builder.setDefaultSocketConfig(socketConfig)
.setDefaultRequestConfig(requestConfig)
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);
}

关于java - 无法在 Camel HTTP 组件中配置 "Keep Alive",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38701588/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com