gpt4 book ai didi

performance - Cassandra:如何使用 CQL 插入性能良好的新宽行

转载 作者:行者123 更新时间:2023-12-04 10:07:30 26 4
gpt4 key购买 nike

我正在评估 cassandra。我正在使用 datastax 驱动程序和 CQL。

我想存储一些具有以下内部结构的数据,其中每次更新的名称都不同。

+-------+-------+-------+-------+-------+-------+
| | name1 | name2 | name3 | ... | nameN |
| time +-------+-------+-------+-------+-------+
| | val1 | val2 | val3 | ... | valN |
+-------+-------+-------+-------|-------+-------+

所以时间应该是列键,名称应该是行键。我用来创建这个表的 CQL 语句是:
CREATE TABLE IF NOT EXISTS test.wide (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE

我希望架构采用这种方式以便于查询。我还必须偶尔存储超过 65000 行的更新。所以使用 cassandra list/set/map 数据类型不是一个选项。

我必须能够每秒处理至少 1000 个宽行插入,并且名称/值对的数量不同但数量很大(~1000)。

问题如下:我编写了一个简单的基准测试,它执行 1000 个宽行插入,每个行插入 10000 个名称/值对。我使用 CQL 和 datastax 驱动程序的性能非常慢,而不使用 CQL(使用 astyanax)的版本在同一测试集群上具有良好的性能。

我读过这篇 related question ,并且在这个问题的公认答案中表明您应该能够使用 原子地快速创建一个新的宽行批量预处理语句 ,它们在 cassandra 2 中可用。

所以我尝试使用这些,但我的性能仍然很慢(对于在本地主机上运行的小型三节点集群,每秒插入两次)。我是否遗漏了一些明显的东西,还是必须使用较低级别的 thrift API? 我已经在 astyanax 中使用 ColumnListMutation 实现了相同的插入,并且每秒获得大约 30 个插入。

如果我必须使用较低级别的 thrift API:
  • 它实际上已被弃用,还是因为它的级别较低而使用起来不方便?
  • 我能用 CQL 查询用 thrift api 创建的表吗?

  • 下面是 Scala 中的一个自包含代码示例。它只是创建一个批处理语句,用于插入具有 10000 列的宽行,并重复乘以插入性能。

    我使用了 BatchStatement 的选项和一致性级别,但没有什么能让我获得更好的性能。

    我唯一的解释是,尽管批处理由准备好的语句组成,但条目还是被逐行添加到行中。
    package cassandra

    import com.datastax.driver.core._

    object CassandraTestMinimized extends App {

    val keyspace = "test"
    val table = "wide"
    val tableName = s"$keyspace.$table"

    def createKeyspace = s"""
    CREATE KEYSPACE IF NOT EXISTS ${keyspace}
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """

    def createWideTable = s"""
    CREATE TABLE IF NOT EXISTS ${tableName} (
    time varchar,
    name varchar,
    value varchar,
    PRIMARY KEY (time,name))
    WITH COMPACT STORAGE
    """

    def writeTimeNameValue(time: String) = s"""
    INSERT INTO ${tableName} (time, name, value)
    VALUES ('$time', ?, ?)
    """

    val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
    val session = cluster.connect()

    session.execute(createKeyspace)
    session.execute(createWideTable)

    for(i<-0 until 1000) {
    val entries =
    for {
    i <- 0 until 10000
    name = i.toString
    value = name
    } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
    }

    def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
    .prepare(writeTimeNameValue(time.toString))
    .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
    batch.add(template.bind(k, v))
    batch
    }
    }

    这是 astyanax 代码(从 astyanax example 修改而来),​​它基本上以 15 倍的速度执行相同的操作。请注意,这也不使用异步调用,因此这是一个公平的比较。这要求列族已经存在,因为我还没有弄清楚如何使用 astyanax 创建它,并且该示例没有任何用于创建列族的代码。
    package cassandra;

    import java.util.Iterator;

    import com.netflix.astyanax.ColumnListMutation;
    import com.netflix.astyanax.serializers.AsciiSerializer;
    import com.netflix.astyanax.serializers.LongSerializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import com.netflix.astyanax.AstyanaxContext;
    import com.netflix.astyanax.Keyspace;
    import com.netflix.astyanax.MutationBatch;
    import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
    import com.netflix.astyanax.connectionpool.OperationResult;
    import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
    import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
    import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
    import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
    import com.netflix.astyanax.model.Column;
    import com.netflix.astyanax.model.ColumnFamily;
    import com.netflix.astyanax.model.ColumnList;
    import com.netflix.astyanax.thrift.ThriftFamilyFactory;

    public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
    logger.debug("init()");

    context = new AstyanaxContext.Builder()
    .forCluster("Test Cluster")
    .forKeyspace("test1")
    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
    .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
    )
    .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
    .setPort(9160)
    .setMaxConnsPerHost(1)
    .setSeeds("127.0.0.1:9160")
    )
    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
    .setCqlVersion("3.0.0")
    .setTargetCassandraVersion("2.0.5"))
    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
    .buildKeyspace(ThriftFamilyFactory.getInstance());

    context.start();
    keyspace = context.getClient();

    EMP_CF = ColumnFamily.newColumnFamily(
    EMP_CF_NAME,
    LongSerializer.get(),
    AsciiSerializer.get());
    }

    public void insert(long time) {
    MutationBatch m = keyspace.prepareMutationBatch();

    ColumnListMutation<String> x =
    m.withRow(EMP_CF, time);
    for(int i=0;i<10000;i++)
    x.putColumn(Integer.toString(i), Integer.toString(i));

    try {
    @SuppressWarnings("unused")
    Object result = m.execute();
    } catch (ConnectionException e) {
    logger.error("failed to write data to C*", e);
    throw new RuntimeException("failed to write data to C*", e);
    }
    logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
    OperationResult<ColumnList<String>> result;
    try {
    result = keyspace.prepareQuery(EMP_CF)
    .getKey(time)
    .execute();

    ColumnList<String> cols = result.getResult();
    // process data

    // a) iterate over columsn
    for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
    Column<String> c = i.next();
    String v = c.getStringValue();
    System.out.println(c.getName() + " " + v);
    }

    } catch (ConnectionException e) {
    logger.error("failed to read from C*", e);
    throw new RuntimeException("failed to read from C*", e);
    }
    }

    public static void main(String[] args) {
    AstClient c = new AstClient();
    c.init();
    long t00 = System.nanoTime();
    for(int i=0;i<1000;i++) {
    long t0 = System.nanoTime();
    c.insert(i);
    long dt = System.nanoTime() - t0;
    System.out.println((1.0e9/dt) + " " + i);
    }
    long dtt = System.nanoTime() - t00;

    c.read(0);
    System.out.println(dtt / 1e9);
    }

    }

    更新:我在 cassandra-user 上找到了这个帖子邮件列表。在进行大的宽行插入时,CQL 似乎存在性能问题。有票 CASSANDRA-6737跟踪此问题。

    更新 2:我已经尝试了附加到 CASSANDRA-6737 的补丁,我可以确认这个补丁完全解决了这个问题。感谢 DataStax 的 Sylvain Lebresne 如此迅速地解决了这个问题!

    最佳答案

    您的代码中有一个错误,我认为这可以解释您所看到的许多性能问题:对于每个批次,您都需要再次准备语句。准备一个语句并不是非常昂贵,但是这样做会增加很多延迟。您等待该语句准备好所花费的时间是您不构建批处理的时间,而 Cassandra 不花费处理该批处理的时间。准备好的语句只需要准备一次并且应该被重复使用。

    我认为很多糟糕的性能都可以解释为延迟问题。瓶颈很可能是您的应用程序代码,而不是 Cassandra。即使您只准备一次该语句,您仍然会花费大部分时间在应用程序中受 CPU 限制(构建大批量)或不做任何事情(等待网络和 Cassandra)。

    您可以做两件事:首先使用 CQL 驱动程序的异步 API 并在网络和 Cassandra 忙于您刚刚完成的那个时构建下一个批处理;其次尝试运行多个线程做同样的事情。您必须试验的确切线程数取决于您拥有的核心数以及您是否在同一台机器上运行一个或三个节点。

    在同一台机器上运行三节点集群会使集群比运行单个节点更慢,而在不同机器上运行会使其更快。在同一台机器上运行应用程序也没有完全帮助。如果你想测试性能,要么只运行一个节点,要么在不同的机器上运行一个真正的集群。

    批处理可以为您提供额外的性能,但并非总是如此。它们可能会导致您在测试代码中看到的那种问题:缓冲区膨胀。一旦批处理变得太大,您的应用程序将花费太多时间构建它们,然后将太多时间推送到网络上,以及等待 Cassandra 处理它们的时间太多。您需要对批量大小进行试验,看看哪种效果最好(但使用真实的集群来做,否则您将看不到网络的影响,当您的批量变大时,这将是一个重要因素)。

    如果您使用批处理,请使用压缩。压缩在大多数请求负载中没有区别(响应是另一回事),但是当您发送大量批次时,它会产生很大的不同。

    Cassandra 中的宽行写入没有什么特别之处。除了一些异常(exception),模式不会改变处理写入所需的时间。我运行的应用程序每秒执行数以万计的非批处理混合宽行和非宽行写入。集群并不大,每个只有三四个 m1.xlarge EC2 节点。诀窍是在发送下一个请求之前永远不要等待请求返回(这并不意味着一劳永逸,只需以相同的异步方式处理响应)。延迟是性能杀手。

    关于performance - Cassandra:如何使用 CQL 插入性能良好的新宽行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21778671/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com