gpt4 book ai didi

java - 使用 Java 和 CQLSSTableWriter 将大行数据加载到 Cassandra 中

转载 作者:行者123 更新时间:2023-12-01 22:50:25 26 4
gpt4 key购买 nike

我一直在尝试使用 Java 的 SSTableLoader API 将非常大的数据集加载到 Cassandra 中,但我的程序在执行此操作时始终超时。

我正在将文章分解为单词 ngram(一元词、二元词、三元词)。我有一个包含三个列族(一元组、三元组、二元组)的键空间。在这些列族中,行键将是文档 ID,然后对于该文档中的每个 ngram,将添加一个包含该 ngram 的列。

因此,一篇 ID 为“article1”且内容为“这是一个测试句子”的文章,该行看起来像..

row id    |   col  | col | col |  col   |    col
----------------------------------------------------
article1 | This | is | a | test | sentence

这是我用来启动 SSTableWriter 并向其中添加数据的 Java 文件:

package cassandrabulktest.cassandra;

import static NGramProperties.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;



public class NGramLoader {
private static final String UNIGRAM_SCHEMA = "CREATE TABLE articles.unigrams (" +
"docid text, " +
"unigram text, " +
"PRIMARY KEY (unigram, docid))";

private static CQLSSTableWriter unigram_writer;

private static final String BIGRAM_SCHEMA = "CREATE TABLE articles.bigrams (" +
"docid text, " +
"bigram text, " +
"PRIMARY KEY (bigram, docid))";

private static CQLSSTableWriter bigram_writer;

private static final String TRIGRAM_SCHEMA = "CREATE TABLE articles.trigrams (" +
"docid text, " +
"trigram text, " +
"PRIMARY KEY (trigram, docid))";

private static CQLSSTableWriter trigram_writer;


public static void initDirectories(String startdate, int count) {
String[] grams = { "unigrams", "bigrams", "trigrams" };
for (String gram : grams) {
File f = new File(BASE_LOCATION + "/" + startdate + "/articles/" + gram + "/");
f.mkdirs();
}

unigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/unigrams/")
.forTable(UNIGRAM_SCHEMA)
.using("INSERT INTO articles.unigrams (docid, unigram) VALUES (?, ?)")
.build();

bigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/bigrams/")
.forTable(BIGRAM_SCHEMA)
.using("INSERT INTO articles.bigrams (docid, bigram) VALUES (?, ?)")
.build();

trigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/trigrams/")
.forTable(TRIGRAM_SCHEMA)
.using("INSERT INTO articles.trigrams (docid, trigram) VALUES (?, ?)")
.build();
}

public static void load(String articleId, ArrayList<String> unigrams, ArrayList<String> bigrams, ArrayList<String> trigrams) throws IOException, InvalidRequestException {
for (String unigram : unigrams) {
unigram_writer.addRow(unigram, articleId);
}

for (String bigram : bigrams) {
bigram_writer.addRow(bigram, articleId);
}

for (String trigram : trigrams) {
trigram_writer.addRow(trigram, articleId);
}
}

public static void closeWriter() throws IOException {
unigram_writer.close();
bigram_writer.close();
trigram_writer.close();
}
}

我为迭代的每篇文章调用“load”,ArrayList 只是需要添加的 ngram 的列表。

程序启动时速度足够快,但在大约 100,000 篇文章之后,它变得慢得令人难以置信。我假设作者正在将数据即时合并到一个 SSTable 中,随着项目数量变得太大,这会极大地减慢速度。

关于解决这个问题的方法有什么想法吗?

最佳答案

所以这有帮助。我注意到在同一输入目录中创建的记录越多,导入的时间就越长。我环顾四周,发现这是一个问题,因为索引的大小和 cassandra 每次写入时都必须重建它。我还没有验证这一点,但从我看到的结果来看这是有道理的。这是我的解决方案,重新创建表编写器并创建一个新目录,以便索引的构建会更快。

它并不完美,但速度要快得多。

class CassandraLoader {

private static final Logger logger = Logger.getLogger(CassandraLoader.class
.getName());

// After half a million records we will rotate the directory for efficiency
private static final int MAX_RECORD_COUNT = 500000;

private CQLSSTableWriter tableWriter;

private final CsvIOFactory csvIOFactory = CsvIOFactory.createFactory(
createCsvConfig(), AdBlockLog.class);;

private final CsvDeserializer deSerializer;

private final String cqlKeySpace;

private final String cqlTable;

/**
* This is the total number of output directories we have processed.
*/
private int rolloverFileCount = 0;

/**
* Output directory name.
*/
private String outputDirectory;

/**
* Constructor that initializes the output cql keyspace and the cql table where
* the data needed to be stored.
*
* @param cqlKeySpace
* @param cqlTable
* @param outputDirectory
*/
protected CassandraLoader(final String cqlKeySpace, final String cqlTable,
final String outputDirectory) {
this.cqlKeySpace = cqlKeySpace;
this.cqlTable = cqlTable;
this.outputDirectory = outputDirectory;
// Create a new Desieralizer.
deSerializer = csvIOFactory.createDeserializer();

tableWriter = createTableWriter(outputDirectory, rolloverFileCount);
}

public int load(final String s3Bucket, final String s3Regex)
throws InvalidRequestException, IllegalArgumentException,
IllegalAccessException, IOException {

int totalRecordCount = 0;
int rolloverRecordCount = 0;


logger.info("Loading files from bucket " + s3Bucket + " with regex "
+ s3Regex);

final List<String> s3FileKeys = S3Util.listBucketToKeys(s3Bucket, s3Regex);
logger.info("Found " + s3FileKeys.size() + " total s3 files");

for (String s3fileKey : s3FileKeys) {
logger.info("Processing file " + s3fileKey);
int recordsProcessed = loadCsvFromS3Bulk(s3Bucket, s3fileKey);
totalRecordCount += recordsProcessed;
rolloverRecordCount += recordsProcessed;

logger.info("Total Record Count " + totalRecordCount);
logger.info("Rollover Record Count " + rolloverRecordCount);

if (rolloverRecordCount >= MAX_RECORD_COUNT) {
tableWriter.close();
tableWriter = createTableWriter(outputDirectory,
++rolloverFileCount);
rolloverRecordCount = 0;
}

}
return totalRecordCount;
}

private int loadCsvFromS3Bulk(final String bucketName, final String key)
throws IOException, InvalidRequestException,
IllegalArgumentException, IllegalAccessException {

// Have to close all of these
InputStream s3InputStream = null;
InputStream gzStream = null;
InputStreamReader bufReader = null;

int recordsProcessed = 0;

try {
s3InputStream = S3Util.getFileInputStream(bucketName, key);
gzStream = new GZIPInputStream(s3InputStream);

bufReader = new InputStreamReader(gzStream,
StandardCharsets.US_ASCII);

deSerializer.open(bufReader);

for (; deSerializer.hasNext(); recordsProcessed++) {
AdBlockLog abl = deSerializer.next();
tableWriter.addRow(getRowMap(abl));
}

} finally {
deSerializer.close(true);
bufReader.close();
gzStream.close();
s3InputStream.close();
}

MemoryUtils.printUsage(logger);
return recordsProcessed;
}

public void close() throws IOException {
tableWriter.close();
}

@VisibleForTesting
protected Map<String, Object> getRowMap(final CassandraEntity casEntity)
throws IllegalArgumentException, IllegalAccessException {

Map<String, Object> rowMap = new HashMap<String, Object>();

for (Field f : casEntity.getClass().getDeclaredFields()) {
if (f.isAnnotationPresent(Column.class)) {
Column columnAnnotation = f.getAnnotation(Column.class);
Class<?> clazz = f.getType();
f.setAccessible(true);

logger.finest("adding column with class " + clazz.getName());
if (clazz.isAssignableFrom(BigDecimal.class)) {
BigDecimal value = (BigDecimal) f.get(casEntity);
rowMap.put(columnAnnotation.name(), (value == null ? null
: value.doubleValue()));
continue;
}
// Anything other than BigDecimal we can just add.
if (clazz.isAssignableFrom(String.class)) {
String value = (String) f.get(casEntity);
// I think this should save space
rowMap.put(columnAnnotation.name(),
(value == null || value.isEmpty()) ? null : value);
}
rowMap.put(columnAnnotation.name(), f.get(casEntity));
}
}

return rowMap;
}


/**
* Create a new tableWriter. This is most important for doing rollover
* to a new directory to increase speed and efficiency.
*
* The output will be stored in the same directory where the application is
* being ran in the format of cqlKeySpace/outputDirectoryName_iterator
*
* ex.
* s3dump/dt=2015-02-01_1
*
* @param outputDirectoryName The directory name that you want to write the output to
* @param iteration The iteration that will be appended to the directory.
* @return A newly created {@link CQLSSTableWriter}
*/
private final CQLSSTableWriter createTableWriter(
String outputDirectoryName, int iteration) {
final String directoryName = String.format(cqlKeySpace + "/%s_%s",
outputDirectoryName, Integer.toString(iteration));

final File currentOutputDirectory = new File(directoryName);

if (!currentOutputDirectory.exists()) {
logger.info("Creating sstable director "
+ currentOutputDirectory.getName());
currentOutputDirectory.mkdirs();
}

String schema = String.format(AdBlockLog.AD_BLOCK_LOG_SCHEMA,
cqlKeySpace, cqlTable);

String insert = String.format(AdBlockLog.AD_BLOCK_LOG_INSERT_STMT,
cqlKeySpace, cqlTable);

return CQLSSTableWriter.builder()
.inDirectory(currentOutputDirectory.getAbsolutePath())
.withPartitioner(new Murmur3Partitioner())
.withBufferSizeInMB(128).forTable(schema).using(insert).build();
}

private static final CsvConfiguration createCsvConfig() {
CsvConfiguration config = new CsvConfiguration();
config.setFieldDelimiter(',');
return config;

}
}

关于java - 使用 Java 和 CQLSSTableWriter 将大行数据加载到 Cassandra 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28506947/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com