- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在使用 datastax java 驱动程序 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在以 QUORUM 一致性异步编写。
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我上面的保存方法将以非常快的速度从多个线程调用。
问题:
我想限制对异步写入 Cassandra 的 executeAsync
方法的请求。如果我的写入速度超过我的 Cassandra 集群可以处理的速度,那么它将开始抛出错误,我希望我的所有写入都应该成功地进入 Cassandra 而没有任何损失。
我看到了这个post解决方案是使用具有固定数量许可的 Semaphore
。但我不确定如何以及什么是最好的实现方式。我以前从未使用过信号量。这是逻辑。谁能提供一个基于我的代码的信号量示例,或者如果有任何更好的方法/选项,那么也请告诉我。
In the context of writing a dataloader program, you could do something like the following:
- To keep things simple use a Semaphore or some other construct with a fixed number of permits (that will be your maximum number of inflight requests). Whenever you go to submit a query using executeAsync, acquire a permit. You should really only need 1 thread (but may want to introduce a pool of # cpu cores size that does this) that acquires the permits from the Semaphore and executes queries. It will just block on acquire until there is an available permit.
- Use Futures.addCallback for the future returned from executeAsync. The callback should call Sempahore.release() in both onSuccess and onFailure cases. By releasing a permit, this should allow your thread in step 1 to continue and submit the next request.
我还看到了其他几个 post他们在哪里讨论过使用 RingBuffer
或 Guava RateLimitter
那么我应该使用哪个更好?以下是我能想到的选项:
谁能帮我举例说明我们如何限制请求或获得 cassandra 写入的背压并确保所有写入成功进入 cassandra?
最佳答案
不是权威答案,但也许会有所帮助。首先,您应该考虑当无法立即执行查询时您会怎么做。无论您选择哪种速率限制,如果您收到的请求速率高于您可以写入 Cassandra 的速率,最终您的进程都会被等待请求阻塞。在那一刻,您需要告诉您的客户暂缓他们的请求(“推迟”)。例如。如果他们是通过 HTTP 来的,那么响应状态将是 429“请求太多”。如果您在同一进程中生成请求,则决定可接受的最长超时时间。也就是说,如果 Cassandra 跟不上,那么就该扩展(或调整)它了。
也许在实现速率限制之前,值得在调用 save
方法(使用 Thread.sleep(...))之前试验并在线程中添加人为延迟,看看它是否解决了您的问题或还需要其他东西。
查询返回错误是来自 Cassandra 的背压。但您可以选择或实现 RetryPolicy确定何时重试失败的查询。
你也可以看看connection pool options (尤其是 Monitoring and tuning the pool )。一可调异步个数requests per connection .但是文档说,对于 Cassandra 2.x,此参数上限为 128,不应更改它(不过我会尝试一下:)
信号量的实现看起来像
/* Share it among all threads or associate with a thread for per-thread limits
Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20);
public void save(String process, int clientid, long deviceid) {
....
queryPermits.acquire(); // Blocks until a permit is available
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
queryPermits.release();
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
queryPermits.release(); // Permit should be released in all cases.
logger.logError("error= ", t);
}
}, executorService);
....
}
(在实际代码中,我会创建一个装饰器来调用包装方法,然后释放许可。)
Guava 的 RateLimiter 类似于信号量,但允许在未充分利用期后临时爆发并根据时间限制请求(而不是 Activity 查询的总数)。
但是无论如何,请求都会因各种原因而失败,因此最好制定一个如何重试它们的计划(以防出现间歇性错误)。
它可能不适合你的情况,但我会尝试使用一些队列或缓冲区来排队请求(例如 java.util.concurrent.ArrayBlockingQueue
)。 “缓冲区已满”意味着客户应该等待或放弃请求。缓冲区也将用于重新排队失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试。当队列已满并且同时有新的失败请求时,还应该以某种方式处理这种情况。然后,单线程工作人员将从队列中挑选请求并将它们发送到 Cassandra。因为它不应该做太多,所以它不太可能成为瓶颈。该工作人员还可以应用自己的速率限制,例如基于 com.google.common.util.concurrent.RateLimiter
的计时。
如果想尽可能避免丢失消息,他可以在 Cassandra 前面放置一个具有持久性的消息代理(例如 Kafka)。这样传入的消息甚至可以在 Cassandra 长时间中断后继续存在。但是,我想,这对你来说太过分了。
关于java - 使用 "executeAsync"时如何限制对 cassandra 的写入请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41049753/
我的 RestSharp 实现存在以下问题。如何让我的应用程序在继续之前等待 ExecuteAsync() 的响应? 我尝试了不同的解决方案: 首先(该方法不等待 ExecuteAsync 响应):
我熟悉如何在我的 Google Drive 中创建文件夹,但我正在考虑使用异步方法来执行此操作。但是,在这样做时,我不确定如何获取我明确添加到我希望返回的字段中的字段。 我的代码如下: private
我在 Asp.Net Core Web API 中实现了一个 Microsoft.Extensions.Hosting.BackgroundService,它在 ExecuteAsync 内部有一个阻
对于使用 MSAL.NET v4 (nuget Microsoft.Identity.Client v4.3.0) 中的应用程序(客户端 ID 和 secret 无用户上下文)使用承载 token 进
所以,这一直有效(并且在模拟器中继续正常工作)。 当我在 Azure 中启动经典云服务时,出现以下错误。 ExecuteAsync() 位于 Web 角色中,位于 Results/ChallengeR
我对 AcquireTokenWithDeviceCode 的调用以及对返回对象使用 ExecuteAsync() 的调用永远不会返回。 deviceCodeResultCallback 中的断点按预
这是我的原始方法示例 public void insertIntoDb(SampleObject sample){ ------------------------------------
我正在尝试使用 ExecuteAsync 发出异步获取请求,但它从不回应。令我困惑的是 ExecuteAsync有效,两种同步方法也是如此 Execute和 Execute . 这是我的代码: var
我正在调用 request.executeAnsyc 以用信息填充我的数组并显示在 ListView 上。遗憾的是,在 request.executeAnsyc 完成之前调用了数组。所以我的 Frag
我正在尝试将 10000 条记录插入 Azure 表存储中。我正在使用 ExecuteAsync() 来实现它,但不知何故插入了大约 7500 条记录,其余记录丢失了。我故意不使用 await 关键字
所以我有一个 dotnet 核心工作进程,我想在某些情况下关闭工作进程。 protected override async Task ExecuteAsync(CancellationToken st
我有一个名为 Worker 的 BackgroundService,我重写了 ExecuteAsync 方法以每 10 秒运行一次。有时,我跑的东西会持续很长时间。在这种情况下,我想终止正在运行的程序
我在执行包装策略时遇到上述异常,包括:重试、断路器和隔板。 我有以下政策: var sharedBulkhead = Policy.BulkheadAsync( maxPara
如果根据 Protractor 规范,我在 browser.executeAsyncScript 中执行脚本,我应该如何告知脚本确实失败了?考虑以下对 browser.executeAsyncScri
尝试使用 Nsubstitute 为 RestClient(来自 RestSharp)模拟 ExecuteAsync 方法时遇到了困难。我看过一个使用 Moq 的示例(此处:Mocking Rests
我在异步方法中返回变量时遇到问题。我能够获取要执行的代码,但无法获取返回电子邮件地址的代码。 public async Task GetSignInName (string id) {
我正在尝试通过调用 session.executeAsync() 而不是 session.execute() 来加速我们的代码写入数据库。 我们有数据库连接可能断开的用例,目前之前的 execute(
我是 Javascript 的新手,正在尝试编写一个 firefox 附加组件。 我正在尝试将从 SQL 查询返回的数据传递/提取到调用函数。好像不行。 我搜索了有关变量范围的信息,查看了我可以在该站
我正在使用 datastax java 驱动程序 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在以 QUORUM 一致性异步编写。 pri
在我的 api Controller 操作方法中。我正在为获取请求使用内容协商。代码是: IContentNegotiator negotiator = this.Configuration.Serv
我是一名优秀的程序员,十分优秀!