gpt4 book ai didi

java - 异步写入似乎在 Cassandra 中被破坏

转载 作者:搜寻专家 更新时间:2023-10-30 21:26:41 24 4
gpt4 key购买 nike

在将 900 万行的批处理写入 12 节点的 cassandra (2.1.2) 集群时,我遇到了 spark-cassandra-connector (1.0.4, 1.1.0) 的问题。我正在以一致性 ALL 写入并以一致性 ONE 读取,但每次读取的行数都不同于 900 万(8.865.753、8.753.213 等)。

我检查了连接器的代码,没有发现任何问题。然后,我决定独立于 spark 和连接器编写自己的应用程序来调查问题(唯一的依赖项是 datastax-driver-code 版本 2.1.3)。

完整代码、启动脚本和配置文件现在可以是 found on github .

在伪代码中,我编写了两个不同版本的应用程序,同步版本:

try (Session session = cluster.connect()) {

String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);

for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys

BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);

session.execute(bound);
}

}

异步的:

try (Session session = cluster.connect()) {

List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);

for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys

while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}

BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);

futures.add(session.executeAsync(bound));
}

while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}

最后一个与非批量配置情况下连接器使用的类似。

应用程序的两个版本在所有情况下都工作相同,除非负载很高。

例如,当在 9 台机器(45 个线程)上运行具有 5 个线程的同步版本时,向集群写入 900 万行,我在后续读取中找到了所有行(使用 spark-cassandra-connector)。

如果我使用每台机器 1 个线程(9 个线程)运行异步版本,执行速度会快得多,但我无法在后续读取中找到所有行(与 spark-cassandra-connector 出现的问题相同)。

代码在执行过程中没有抛出异常。

问题的原因可能是什么?

我添加了一些其他结果(感谢评论):

  • 9 台机器上有 9 个线程的异步版本,每个线程有 5 个并发写入器(45 个并发写入器):没有问题
  • 在 9 台机器上使用 90 个线程同步版本(每个 JVM 实例 10 个线程):没有问题

问题似乎开始出现在异步写入和多个并发写入器 > 45 和 <= 90 时,所以我做了其他测试以确保发现是正确的:

  • 将 ResultSetFuture 的“get”方法替换为“getUninterruptibly”:同样的问题。
  • 9 台机器上有 18 个线程的异步版本,有 5 个并发每个线程的作者数(90 个并发作者):没有问题

最后的发现表明,大量并发编写器 (90) 并不是第一次测试中预期的问题。问题是使用同一 session 的大量异步写入。

在同一 session 上有 5 个并发异步写入时,问题不存在。如果我将并发写入数增加到 10,一些操作会在没有通知的情况下丢失。

如果您在同一 session 上同时发出多个 (>5) 写入操作,那么 Cassandra 2.1.2(或 Cassandra Java 驱动程序)中的异步写入似乎会被破坏。

最佳答案

尼古拉和我本周末通过电子邮件进行了交流,我想我会在这里提供我当前理论的更新。我看了一下 github project Nicola 分享并试验了 EC2 上的 8 节点集群。

我能够使用 2.1.2 重现该问题,但确实观察到一段时间后我可以重新执行 spark 作业并返回所有 900 万行。

我似乎注意到,当节点处于压缩状态时,我并没有得到全部 900 万行。一时兴起,我看了看 change log for 2.1并观察到一个问题 CASSANDRA-8429 - "Some keys unreadable during compaction"这或许可以解释这个问题。

看到问题已针对 2.1.3 解决,我针对 cassandra-2.1 分支重新运行测试并在压缩 Activity 发生时运行计数作业,并返回了 900 万行。

我想对此进行更多试验,因为我对 cassandra-2.1 分支的测试相当有限,而且压缩 Activity 可能纯属巧合,但我希望这可以解释这些问题。

关于java - 异步写入似乎在 Cassandra 中被破坏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27667228/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com