gpt4 book ai didi

java - 如何使用 Datastax Java 驱动程序的异步/批量写入功能

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:34:33 27 4
gpt4 key购买 nike

我计划使用 Datastax Java 驱动程序写入 Cassandra。我主要对 Datastax Java 驱动程序的 Batch WritesAsycnhronous 功能感兴趣,但我不能获得任何可以解释我如何将这些功能合并到我下面使用 Datastax Java 驱动程序的代码中的教程..

/**
* Performs an upsert of the specified attributes for the specified id.
*/
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

try {

// make a sql here using the above input parameters.

String sql = sqlPart1.toString()+sqlPart2.toString();

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);

BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

DatastaxConnection.getSession().execute(query);

} catch (InvalidQueryException e) {
LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
} catch (Exception e) {
LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
}
}

在下面的代码中,我使用 Datastax Java 驱动程序创建到 Cassandra 节点的连接。

/**
* Creating Cassandra connection using Datastax Java driver
*
*/
private DatastaxConnection() {

try{
builder = Cluster.builder();
builder.addContactPoint("some_nodes");

builder.poolingOptions().setCoreConnectionsPerHost(
HostDistance.LOCAL,
builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

cluster = builder
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();

StringBuilder s = new StringBuilder();
Set<Host> allHosts = cluster.getMetadata().getAllHosts();
for (Host h : allHosts) {
s.append("[");
s.append(h.getDatacenter());
s.append(h.getRack());
s.append(h.getAddress());
s.append("]");
}
System.out.println("Cassandra Cluster: " + s.toString());

session = cluster.connect("testdatastaxks");

} catch (NoHostAvailableException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (Exception e) {

}
}

任何人都可以帮助我如何向我的上述代码添加批量写入或异步功能。感谢您的帮助。

我正在运行 Cassandra 1.2.9

最佳答案

对于异步,它就像使用 executeAsync 函数一样简单:

...
DatastaxConnection.getSession().executeAsync(query);

对于批处理,您需要构建查询(我使用字符串,因为编译器知道如何很好地优化字符串连接):

String cql =  "BEGIN BATCH "
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

附带说明一下,我认为您对语句的绑定(bind)可能看起来像这样,假设您将属性更改为映射列表,其中每个映射代表批处理中的更新/插入:

BoundStatement query = prepStatement.bind(userId,
attributesList.get(0).values().toArray(new Object[attributes.size()]),
userId_2,
attributesList.get(1).values().toArray(new Object[attributes.size()]));

关于java - 如何使用 Datastax Java 驱动程序的异步/批量写入功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19202812/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com