gpt4 book ai didi

java - Cassandra + Hector,在测试中强制压缩以检查空行是否被删除

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:10:17 27 4
gpt4 key购买 nike

我们想要测试,如果一个列具有 TTL(生存时间)属性,它最终将连同包含它的空行一起从 cassandra 中完全删除。

据我了解,测试这种行为的算法是

  • 保存对象时,为列设置TTL
  • 等待TTL时间过去,检查返回值是否为null
  • 等待 GC_GRACE_SECONDS perion 过去
  • 检查该行是否也被删除

而且我没有检查最后一项。

正如我发现的(例如 herehere 以及其他地方),我需要运行压缩。已经提出了类似的问题(例如 Hector (Cassandra) Delete Anomaly ),但我没有找到任何有用的东西,谷歌搜索也没有太大帮助。

所以问题是,我如何从我的集成测试(使用 hector)强制压缩以确保它按预期运行?还是有其他方法可以做到这一点?

附言截断列族不是一种选择。


这里是详细信息。

我的测试:

private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";

private static final int GC_CRACE_SECONDS = 5;

// sut
private CassandraService cassandraService;

// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr",
"localhost:9160");

private Keyspace keyspace;

@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}

@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster,
new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE,
COLUMN_FAMILY, GC_CRACE_SECONDS);
}

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
Object obj = "OBJECT";
String rowKey = "key";
String columnName = "columnName";
logger.info("before persisting rows count is {}" + countRows());

cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);

logger.info("after persisting rows count is {}" + countRows());

Object value = retrieve(rowKey, columnName);
assertNotNull(value);

logger.info("before TTL passes rows count is {}" + countRows());

TimeUnit.SECONDS.sleep(6);

Object nullValue = retrieve(rowKey, columnName);
assertNull(nullValue);

logger.info("after TTL passes rows count is {}" + countRows());

TimeUnit.SECONDS.sleep(10);

logger.info("wait 10 more seconds... rows count is {}" + countRows());
System.out.println("================================" + countRows());

TimeUnit.SECONDS.sleep(120);

int countRows = countRows();
logger.info("wait 2 more minutes... rows count is {}" + countRows);
assertEquals(0, countRows);
}

持久化代码:

public void persistObjectWithTtl(Object rowKey, Object columnName, 
Object obj, int ttl) {
LOGGER.debug("Persist {} / {}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj,
SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}

为列族设置 GcGraceSeconds:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition =
createColumnFamilyDefinition(keySpaceName, columnFamilyName);

ThriftCfDef columnFamilyWithGCGraceSeconds =
new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);

cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}

以及计算行数的代码,found on SO :

public int countRows() {
int rowCount = 100;

ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer,
serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10)
.setRowCount(rowCount);

Object lastKey = null;

int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);

QueryResult<OrderedRows<Object, Object, Object>> result =
rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();

if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}

while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;

if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}

if (rows.getCount() < rowCount) {
break;
}

}

return i;
}

谢谢。


更新:

原因是数据量不足以运行压缩,所以我需要放入更多数据,并更频繁地刷新表到磁盘。所以我最终得到了以下测试用例:

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;

logger.info("before persisting rows count is {}", countRows());

for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);

if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}

logger.info("causing major compaction...");
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
logger.info("after major compaction rows count is {}", countRows());

waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}

完整代码:test class and sut

最佳答案

由于您使用的是 Java,因此您可以使用 org.apache.cassandra.db.StorageService 的 forceTableCompaction(keyspace, columnFamily) 方法轻松地通过 JMX 强制压缩 MBean。

关于java - Cassandra + Hector,在测试中强制压缩以检查空行是否被删除,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14644596/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com