gpt4 book ai didi

java - 使用datastax java驱动程序异步写入cassandra的有效方法?

转载 作者:行者123 更新时间:2023-11-30 10:35:38 24 4
gpt4 key购买 nike

我正在使用 datastax java 驱动程序 3.1.0 连接到 cassandra 集群,我的 cassandra 集群版本是 2.0.10。我正在以 QUORUM 一致性异步编写。

  public void save(final String process, final int clientid, final long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);

ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}

@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, Executors.newFixedThreadPool(10));
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}

下面是我的 CacheStatement 类:

public class CacheStatement {
private static final Map<String, PreparedStatement> cache =
new ConcurrentHashMap<>();

private static class Holder {
private static final CacheStatement INSTANCE = new CacheStatement();
}

public static CacheStatement getInstance() {
return Holder.INSTANCE;
}

private CacheStatement() {}

public BoundStatement getStatement(String cql) {
Session session = CassUtils.getInstance().getSession();
PreparedStatement ps = cache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
synchronized (this) {
ps = cache.get(cql);
if (ps == null) {
cache.put(cql, session.prepare(cql));
}
}
}
return ps.bind();
}
}

我上面的 save 方法将从多个线程调用,我认为 BoundStatement 不是线程安全的。顺便说一句,StatementCache 类是线程安全的,如上所示。

  • 因为 BoundStatement 不是线程安全的。如果我从多个线程异步编写,我上面的代码会不会有问题?
  • 其次,我在 addCallback 参数中使用了 Executors.newFixedThreadPool(10)。这样可以吗,还是会有什么问题?或者我应该使用 MoreExecutors.directExecutor。那这两者有什么区别呢?最好的方法是什么?

以下是我使用 datastax java 驱动程序连接到 cassandra 的连接设置:

Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
.withCredentials(username, password).build();

最佳答案

我觉得你做的很好。您可以通过在应用程序启动时准备所有语句来进一步优化,因此您已经缓存了所有内容,因此在“保存”时准备语句不会对性能造成任何影响,并且您不会在工作流程中锁定任何内容。

BoundStatement 不是线程安全的,但是 PreparedStatement 是的,每次调用 getStatement< 时都会返回一个新的 BoundStatement/。事实上,PreparedStatement.bind() 函数实际上是new BoundStatement(ps).bind() 的快捷方式。而且您没有从多个线程访问相同 BoundStatement。所以你的代码没问题。

相反,对于线程池,您实际上是在每个 addCallback 函数上创建一个新的线程池。这是一种资源浪费。我不使用此回调方法,我更喜欢自己管理纯 FutureResultSet,但我看到了 examples在使用 MoreExecutors.sameThreadExecutor() 而不是 MoreExecutors.directExecutor() 的 datastax 文档上。

关于java - 使用datastax java驱动程序异步写入cassandra的有效方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41030680/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com