gpt4 book ai didi

multithreading - 同时使用 CQLSSTableWriter

转载 作者:行者123 更新时间:2023-12-01 15:45:34 24 4
gpt4 key购买 nike

我正在尝试根据 Spark 中的批处理计算结果创建 Cassandra SSTable。理想情况下,每个分区都应该为其保存的数据创建 SSTable,以便尽可能并行化进程(甚至可能将其流式传输到 Cassandra 环)

在遇到 CQLSSTableWriter 的初始障碍后(比如需要 yaml 文件),我现在面临这个问题:

java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts
at org.apache.cassandra.config.Schema.load(Schema.java:347)
at org.apache.cassandra.config.Schema.load(Schema.java:112)
at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

我正在像这样在每个并行分区上创建一个写入器:

def store(rdd:RDD[Message]) = {
rdd.foreachPartition( msgIterator => {
val writer = CQLSSTableWriter.builder()
.inDirectory("/tmp/cass")
.forTable(schema)
.using(insertSttmt).build()
msgIterator.foreach(msg => {...})
})}

如果我正确读取异常,我只能在一个 JVM 中为每个表创建一个编写器。我猜写给 writer 的内容将不是线程安全的,即使它们是多个线程通过让所有并行任务同时尝试将几 GB 数据转储到磁盘而产生的争用也会无论如何都会破坏使用 SSTables 进行批量上传的目的。

那么,有没有办法同时使用CQLSSTableWriter

如果不是,那么在 Cassandra 中以高吞吐量加载批处理数据的下一个最佳选择是什么?

最佳答案

正如您所观察到的,单个写入器只能串行使用(如果不这样做,将发生 ConcurrentModificationExceptions),并且由于 SSTableWriter 使用的 Cassandra 代码中的静态模式构造,在 JVM 中创建多个写入器会失败。

除了生成多个 JVM 之外,我不知道有任何解决方法,每个 JVM 都写入一个单独的目录。

我们已提交 Cassandra JIRA 票证来解决此问题。

https://issues.apache.org/jira/browse/CASSANDRA-7463

关于multithreading - 同时使用 CQLSSTableWriter,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24396902/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com