- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
在将 900 万行的批处理写入 12 节点的 cassandra (2.1.2) 集群时,我遇到了 spark-cassandra-connector (1.0.4, 1.1.0) 的问题。我正在以一致性 ALL 写入并以一致性 ONE 读取,但每次读取的行数都不同于 900 万(8.865.753、8.753.213 等)。
我检查了连接器的代码,没有发现任何问题。然后,我决定独立于 spark 和连接器编写自己的应用程序来调查问题(唯一的依赖项是 datastax-driver-code 版本 2.1.3)。
完整代码、启动脚本和配置文件现在可以是 found on github .
在伪代码中,我编写了两个不同版本的应用程序,同步版本:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
异步的:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
最后一个与非批量配置情况下连接器使用的类似。
应用程序的两个版本在所有情况下都工作相同,除非负载很高。
例如,当在 9 台机器(45 个线程)上运行具有 5 个线程的同步版本时,向集群写入 900 万行,我在后续读取中找到了所有行(使用 spark-cassandra-connector)。
如果我使用每台机器 1 个线程(9 个线程)运行异步版本,执行速度会快得多,但我无法在后续读取中找到所有行(与 spark-cassandra-connector 出现的问题相同)。
代码在执行过程中没有抛出异常。
问题的原因可能是什么?
我添加了一些其他结果(感谢评论):
问题似乎开始出现在异步写入和多个并发写入器 > 45 和 <= 90 时,所以我做了其他测试以确保发现是正确的:
最后的发现表明,大量并发编写器 (90) 并不是第一次测试中预期的问题。问题是使用同一 session 的大量异步写入。
在同一 session 上有 5 个并发异步写入时,问题不存在。如果我将并发写入数增加到 10,一些操作会在没有通知的情况下丢失。
如果您在同一 session 上同时发出多个 (>5) 写入操作,那么 Cassandra 2.1.2(或 Cassandra Java 驱动程序)中的异步写入似乎会被破坏。
最佳答案
尼古拉和我本周末通过电子邮件进行了交流,我想我会在这里提供我当前理论的更新。我看了一下 github project Nicola 分享并试验了 EC2 上的 8 节点集群。
我能够使用 2.1.2 重现该问题,但确实观察到一段时间后我可以重新执行 spark 作业并返回所有 900 万行。
我似乎注意到,当节点处于压缩状态时,我并没有得到全部 900 万行。一时兴起,我看了看 change log for 2.1并观察到一个问题 CASSANDRA-8429 - "Some keys unreadable during compaction"这或许可以解释这个问题。
看到问题已针对 2.1.3 解决,我针对 cassandra-2.1 分支重新运行测试并在压缩 Activity 发生时运行计数作业,并返回了 900 万行。
我想对此进行更多试验,因为我对 cassandra-2.1 分支的测试相当有限,而且压缩 Activity 可能纯属巧合,但我希望这可以解释这些问题。
关于java - 异步写入似乎在 Cassandra 中被破坏,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27667228/
我们有 2 个 cassandra 集群,第一个有旧数据,第二个有新数据。 现在我们想要将旧数据从第一个集群移动或复制到第二个集群。什么是最好的方法来做到这一点以及如何做到这一点? 我们正在使用 DS
我正在考虑安装 OpsCenter 来监控我们在 RackSpace VM 上运行的 24 节点 Cassandra 集群。过去我听说 OpsCenter 减慢了集群速度。我有点担心 OpsCente
假设我有一个复制因子(RF)= 2 的 2 节点集群。 我使用一致性 2 触发插入。当客户端等待响应时,Cassandra 开始写入这 2 个节点。中间一个节点失败,无法完成写入,而另一节点上的写入成
已结束。此问题正在寻求书籍、工具、软件库等的推荐。它不满足 Stack Overflow guidelines 。目前不接受答案。 我们不允许提出寻求书籍、工具、软件库等推荐的问题。您可以编辑问题,以
我在 Cassandra 中有一个表,其中我用 1000 多个条目填充了一些行(每行有 10000 多列)。行中的条目更新非常频繁,基本上只是一个字段(它是一个整数)被更新为不同的值。列的所有其他值保
当Cassandra端有“掉落的突变”时,它是否向调用客户端返回相应的失败?或者即使在服务器端丢弃相应的突变并导致数据丢失,它总是成功响应调用事务的调用客户端? 在一个特定实例中,当我们的 TPS 约
我有一个 Multi-Tenancy 应用程序,其中 tenantId 将成为每个查询的一部分,因此我将其放入所有表的分区键中。 例子: CREATE TABLE users { tenantId t
根据 Datastax 文档,在 Cassandra 中先读后写是一种反模式。 每当我们在 CQLSH 中使用 UPDATE 或使用 Datastax 驱动程序来设置几列(带有 IF 和集合更新)时,
是否有命令或任何方式可以知道 Cassandra 的哪些节点上存储了哪些数据? 我对 Cassandra 很陌生,在谷歌上搜索这个问题并没有多少运气。 谢谢! 最佳答案 您可以使用 nodetool
我们有一个包含 1500 万条记录的表,而我们的表是一个 10 节点的 cassandra 集群。我们有一列有接近 20 个可重复值。是否建议在此列上建立二级索引? 最佳答案 假设在该列上完全均匀分布
Cassandra 发布了它的 technical limitations但没有提到允许的最大列数。是否有最大列数?我需要存储 400 多个字段。这在 Cassandra 中可能吗? 最佳答案 每行的
我想知道当表中有多个非 PK 列时会发生什么。我读过这个例子: http://johnsanda.blogspot.co.uk/2012/10/why-i-am-ready-to-move-to-cq
我有两个关于 Cassandra 查询结果的问题。 当我在 Cassandra 中对表进行“完全”选择(即 select * from table )时,是否保证结果将按分区标记的递增顺序返回? 例如
我无法为 Cassandra 设置 Hector。我已经浏览了 documentation和 Cassandra wiki .这些文档的问题在于,那里的很多信息都已经过时或过时(或者我缺乏知识)。无论
我正在使用 DataStax Enterprise 中 cassandra 中提供的压力测试。如果有人知道的话,我也想要一些关于它和 cassandra 的信息。 - 首先,压力测试使用哪些节点?我的
当我在 CQL 中创建表时,列的顺序是否必须精确 不是 在主键和 中不是 聚类列: CREATE TABLE user ( a ascii, b ascii, c ascii,
我有一张如下表: CREATE TABLE tab( categoryid text, id text, name text, author text, des
我正在尝试学习 Cassandra,但对术语感到困惑。 很多情况下它表示该行存储键/值对。 但是,当我定义一个表时,它更像是声明一个 SQL 表,即;您创建一个表并指定列名和数据类型。 谁能澄清一下?
如何对 cassandra 数据实现审计? 我正在寻找一个开源选项。 cassandra 是否有任何有助于审计的功能? 我可以使用触发器将记录记录到表中吗?我关注了 Triggers示例并且能够将记录
我遇到了一个问题“me.prettyprint.hector.api.exceptions.HUnavailableException:: 可能没有足够的副本来处理一致性级别。”当我有 RF=1 时,
我是一名优秀的程序员,十分优秀!