gpt4 book ai didi

java - 获得 Cassandra Writes 背压的最佳方法是什么?

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:12:44 26 4
gpt4 key购买 nike

我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用 maxRequestsPerConnectionmaxConnectionsPerHost 设置了我的 Cassandra 集群。但是,在测试中我发现,当我达到 maxConnectionsPerHostmaxRequestsPerConnection 时,对 session.executeAsync 的调用不会阻塞。

我现在正在做的是使用 new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) 并在每个异步请求之前递增它,并在 executeAsync 返回的 future 完成时递减它.这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

有没有人想出更好的解决方案来解决这个问题?

一个警告:我希望一个请求在完成之前被视为未完成。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性超时)是我想要背压并停止使用队列消息的主要情况。

问题:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}

当前解决方案:

constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}

另外,有人能看出这个解决方案有什么明显的问题吗?

最佳答案

一个不杀死集群的可能想法是“限制”你对 executeAsync 的调用,例如在一批 100 个(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中 hibernate 并对所有 100 个 future 进行阻塞调用(或使用 Guava 库转换 future 列表进入列表的 future )

这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有异步查询都成功,然后再继续。如果您在调用 future.get() 时捕获到任何异常,您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已尝试重试。

关于来自服务器的背压信号,从CQL二进制协议(protocol)V3开始,有一个错误代码通知客户端协调器过载:https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

从客户端,您可以通过两种方式获取此重载信息:

关于java - 获得 Cassandra Writes 背压的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35323856/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com