gpt4 book ai didi

cassandra - Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句

转载 作者:行者123 更新时间:2023-12-02 21:25:13 24 4
gpt4 key购买 nike

阻止 com.datastax.driver.core.Session 执行方法

public ResultSet execute(Statement statement);

对此方法的评论:

This method blocks until at least some result has been received from the database. However, for SELECT queries, it does not guarantee that the result has been received in full. But it does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method.

来自 com.datastax.driver.core.Session 的非阻塞执行方法

public ResultSetFuture executeAsync(Statement statement);

This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the {@link ResultSetFuture}.

我有 02 个关于它们的问题,因此如果您能帮助我理解它们,那就太好了。

假设我有 100 万条记录,我希望所有这些记录都到达数据库(没有任何丢失)。

问题 1:如果我有 n 个线程,则所有线程都将具有需要发送到数据库的相同数量的记录。它们都使用阻塞执行调用继续向 cassandra 发送多个插入查询。如果我增加 n 的值,是否也有助于加快我需要将所有记录插入到 cassandra 的时间?

这会导致 cassandra 的性能问题吗? Cassandra 是否必须确保对于每个插入记录,集群中的所有节点都应立即了解新记录?为了保持数据的一致性。 (我假设 cassandra 节点甚至不会考虑使用本地机器时间来控制记录插入时间)。

问题2:使用非阻塞执行,如何确保所有插入都成功?我知道的唯一方法是等待 ResultSetFuture 检查插入查询的执行情况。我还有什么更好的办法吗?非阻塞执行是否比阻塞执行更容易失败?

非常感谢您的帮助。

最佳答案

If I have n number of threads, all threads will have the same amount of records they need to send to the database. All of them continue sending multiple insert queries to cassandra using blocking execute call. If I increase the value of n, will it also helps to speed up the time that I need to insert all records to cassandra?

在某种程度上。让我们稍微分离一下客户端实现细节,并从“并发请求数”的角度来看问题,因为如果使用executeAsync,则不需要为每个正在进行的请求都有一个线程。在我的测试中,我发现虽然拥有大量并发请求有很多值(value),但存在一个阈值,达到该阈值就会出现 yield 递减或性能开始下降。我的一般经验法则是(number of Nodes * native_transport_max_threads (default: 128) * 2) ,但或多或​​少您可能会发现更优化的结果。

这里的想法是,排队的请求数量超过 cassandra 一次处理的数量并没有多大值(value)。在减少正在进行的请求数量的同时,您还可以限制驱动程序客户端和 cassandra 之间连接上不必要的拥塞。

Question 2: With non-blocking execute, how can I assure that all of the insertions is successful? The only way I know is waiting for the ResultSetFuture to check the execution of the insert query. Is there any better way I can do ? Is there a higher chance that non-blocking execute is easier to fail then blocking execute?

通过 get 等待 ResultSetFuture是一种途径,但如果您正在开发完全异步的应用程序,您希望尽可能避免阻塞。使用 Guava ,你最好的两个武器是 Futures.addCallback Futures.transform .

  • Futures.addCallback允许您注册 FutureCallback 当驱动程序收到响应时执行。 onSuccess在成功的情况下执行,onFailure否则。

  • Futures.transform允许您有效地映射返回的 ResultSetFuture进入别的东西。例如,如果您只需要 1 列的值,您可以使用它来转换 ListenableFuture<ResultSet>ListenableFuture<String>无需在 ResultSetFuture 上阻止​​您的代码然后获取 String 值。

在编写数据加载程序时,您可以执行以下操作:

  1. 为了简单起见,请使用 Semaphore或具有固定数量许可的其他结构(这将是您的飞行请求的最大数量)。每当您使用 executeAsync 提交查询时,获得许可证。您实际上应该只需要 1 个线程(但可能需要引入一个 # cpu 核心大小的池来执行此操作)来从信号量获取许可并执行查询。它只会阻止获取,直到有可用的许可为止。
  2. 使用Futures.addCallback对于从 executeAsync 返回的 future 。回调应该调用 Sempahore.release()两者皆 onSuccessonFailure案例。通过释放许可证,这应该允许您在步骤 1 中的线程继续并提交下一个请求。

要进一步提高吞吐量,您可能需要考虑使用 BatchStatement并批量提交请求。如果您保持较小的批处理(50-250 是一个不错的数字)并且批处理中的插入全部共享相同的分区键,那么这是一个不错的选择。

关于cassandra - Cassandra 如何处理 datastax java 驱动程序中的阻塞执行语句,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34949292/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com