- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我一直在尝试使用 Java 的 SSTableLoader API 将非常大的数据集加载到 Cassandra 中,但我的程序在执行此操作时始终超时。
我正在将文章分解为单词 ngram(一元词、二元词、三元词)。我有一个包含三个列族(一元组、三元组、二元组)的键空间。在这些列族中,行键将是文档 ID,然后对于该文档中的每个 ngram,将添加一个包含该 ngram 的列。
因此,一篇 ID 为“article1”且内容为“这是一个测试句子”的文章,该行看起来像..
row id | col | col | col | col | col
----------------------------------------------------
article1 | This | is | a | test | sentence
这是我用来启动 SSTableWriter 并向其中添加数据的 Java 文件:
package cassandrabulktest.cassandra;
import static NGramProperties.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
public class NGramLoader {
private static final String UNIGRAM_SCHEMA = "CREATE TABLE articles.unigrams (" +
"docid text, " +
"unigram text, " +
"PRIMARY KEY (unigram, docid))";
private static CQLSSTableWriter unigram_writer;
private static final String BIGRAM_SCHEMA = "CREATE TABLE articles.bigrams (" +
"docid text, " +
"bigram text, " +
"PRIMARY KEY (bigram, docid))";
private static CQLSSTableWriter bigram_writer;
private static final String TRIGRAM_SCHEMA = "CREATE TABLE articles.trigrams (" +
"docid text, " +
"trigram text, " +
"PRIMARY KEY (trigram, docid))";
private static CQLSSTableWriter trigram_writer;
public static void initDirectories(String startdate, int count) {
String[] grams = { "unigrams", "bigrams", "trigrams" };
for (String gram : grams) {
File f = new File(BASE_LOCATION + "/" + startdate + "/articles/" + gram + "/");
f.mkdirs();
}
unigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/unigrams/")
.forTable(UNIGRAM_SCHEMA)
.using("INSERT INTO articles.unigrams (docid, unigram) VALUES (?, ?)")
.build();
bigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/bigrams/")
.forTable(BIGRAM_SCHEMA)
.using("INSERT INTO articles.bigrams (docid, bigram) VALUES (?, ?)")
.build();
trigram_writer = CQLSSTableWriter.builder()
.inDirectory(BASE_LOCATION + "/" + startdate + "/articles/trigrams/")
.forTable(TRIGRAM_SCHEMA)
.using("INSERT INTO articles.trigrams (docid, trigram) VALUES (?, ?)")
.build();
}
public static void load(String articleId, ArrayList<String> unigrams, ArrayList<String> bigrams, ArrayList<String> trigrams) throws IOException, InvalidRequestException {
for (String unigram : unigrams) {
unigram_writer.addRow(unigram, articleId);
}
for (String bigram : bigrams) {
bigram_writer.addRow(bigram, articleId);
}
for (String trigram : trigrams) {
trigram_writer.addRow(trigram, articleId);
}
}
public static void closeWriter() throws IOException {
unigram_writer.close();
bigram_writer.close();
trigram_writer.close();
}
}
我为迭代的每篇文章调用“load”,ArrayList 只是需要添加的 ngram 的列表。
程序启动时速度足够快,但在大约 100,000 篇文章之后,它变得慢得令人难以置信。我假设作者正在将数据即时合并到一个 SSTable 中,随着项目数量变得太大,这会极大地减慢速度。
关于解决这个问题的方法有什么想法吗?
最佳答案
所以这有帮助。我注意到在同一输入目录中创建的记录越多,导入的时间就越长。我环顾四周,发现这是一个问题,因为索引的大小和 cassandra 每次写入时都必须重建它。我还没有验证这一点,但从我看到的结果来看这是有道理的。这是我的解决方案,重新创建表编写器并创建一个新目录,以便索引的构建会更快。
它并不完美,但速度要快得多。
class CassandraLoader {
private static final Logger logger = Logger.getLogger(CassandraLoader.class
.getName());
// After half a million records we will rotate the directory for efficiency
private static final int MAX_RECORD_COUNT = 500000;
private CQLSSTableWriter tableWriter;
private final CsvIOFactory csvIOFactory = CsvIOFactory.createFactory(
createCsvConfig(), AdBlockLog.class);;
private final CsvDeserializer deSerializer;
private final String cqlKeySpace;
private final String cqlTable;
/**
* This is the total number of output directories we have processed.
*/
private int rolloverFileCount = 0;
/**
* Output directory name.
*/
private String outputDirectory;
/**
* Constructor that initializes the output cql keyspace and the cql table where
* the data needed to be stored.
*
* @param cqlKeySpace
* @param cqlTable
* @param outputDirectory
*/
protected CassandraLoader(final String cqlKeySpace, final String cqlTable,
final String outputDirectory) {
this.cqlKeySpace = cqlKeySpace;
this.cqlTable = cqlTable;
this.outputDirectory = outputDirectory;
// Create a new Desieralizer.
deSerializer = csvIOFactory.createDeserializer();
tableWriter = createTableWriter(outputDirectory, rolloverFileCount);
}
public int load(final String s3Bucket, final String s3Regex)
throws InvalidRequestException, IllegalArgumentException,
IllegalAccessException, IOException {
int totalRecordCount = 0;
int rolloverRecordCount = 0;
logger.info("Loading files from bucket " + s3Bucket + " with regex "
+ s3Regex);
final List<String> s3FileKeys = S3Util.listBucketToKeys(s3Bucket, s3Regex);
logger.info("Found " + s3FileKeys.size() + " total s3 files");
for (String s3fileKey : s3FileKeys) {
logger.info("Processing file " + s3fileKey);
int recordsProcessed = loadCsvFromS3Bulk(s3Bucket, s3fileKey);
totalRecordCount += recordsProcessed;
rolloverRecordCount += recordsProcessed;
logger.info("Total Record Count " + totalRecordCount);
logger.info("Rollover Record Count " + rolloverRecordCount);
if (rolloverRecordCount >= MAX_RECORD_COUNT) {
tableWriter.close();
tableWriter = createTableWriter(outputDirectory,
++rolloverFileCount);
rolloverRecordCount = 0;
}
}
return totalRecordCount;
}
private int loadCsvFromS3Bulk(final String bucketName, final String key)
throws IOException, InvalidRequestException,
IllegalArgumentException, IllegalAccessException {
// Have to close all of these
InputStream s3InputStream = null;
InputStream gzStream = null;
InputStreamReader bufReader = null;
int recordsProcessed = 0;
try {
s3InputStream = S3Util.getFileInputStream(bucketName, key);
gzStream = new GZIPInputStream(s3InputStream);
bufReader = new InputStreamReader(gzStream,
StandardCharsets.US_ASCII);
deSerializer.open(bufReader);
for (; deSerializer.hasNext(); recordsProcessed++) {
AdBlockLog abl = deSerializer.next();
tableWriter.addRow(getRowMap(abl));
}
} finally {
deSerializer.close(true);
bufReader.close();
gzStream.close();
s3InputStream.close();
}
MemoryUtils.printUsage(logger);
return recordsProcessed;
}
public void close() throws IOException {
tableWriter.close();
}
@VisibleForTesting
protected Map<String, Object> getRowMap(final CassandraEntity casEntity)
throws IllegalArgumentException, IllegalAccessException {
Map<String, Object> rowMap = new HashMap<String, Object>();
for (Field f : casEntity.getClass().getDeclaredFields()) {
if (f.isAnnotationPresent(Column.class)) {
Column columnAnnotation = f.getAnnotation(Column.class);
Class<?> clazz = f.getType();
f.setAccessible(true);
logger.finest("adding column with class " + clazz.getName());
if (clazz.isAssignableFrom(BigDecimal.class)) {
BigDecimal value = (BigDecimal) f.get(casEntity);
rowMap.put(columnAnnotation.name(), (value == null ? null
: value.doubleValue()));
continue;
}
// Anything other than BigDecimal we can just add.
if (clazz.isAssignableFrom(String.class)) {
String value = (String) f.get(casEntity);
// I think this should save space
rowMap.put(columnAnnotation.name(),
(value == null || value.isEmpty()) ? null : value);
}
rowMap.put(columnAnnotation.name(), f.get(casEntity));
}
}
return rowMap;
}
/**
* Create a new tableWriter. This is most important for doing rollover
* to a new directory to increase speed and efficiency.
*
* The output will be stored in the same directory where the application is
* being ran in the format of cqlKeySpace/outputDirectoryName_iterator
*
* ex.
* s3dump/dt=2015-02-01_1
*
* @param outputDirectoryName The directory name that you want to write the output to
* @param iteration The iteration that will be appended to the directory.
* @return A newly created {@link CQLSSTableWriter}
*/
private final CQLSSTableWriter createTableWriter(
String outputDirectoryName, int iteration) {
final String directoryName = String.format(cqlKeySpace + "/%s_%s",
outputDirectoryName, Integer.toString(iteration));
final File currentOutputDirectory = new File(directoryName);
if (!currentOutputDirectory.exists()) {
logger.info("Creating sstable director "
+ currentOutputDirectory.getName());
currentOutputDirectory.mkdirs();
}
String schema = String.format(AdBlockLog.AD_BLOCK_LOG_SCHEMA,
cqlKeySpace, cqlTable);
String insert = String.format(AdBlockLog.AD_BLOCK_LOG_INSERT_STMT,
cqlKeySpace, cqlTable);
return CQLSSTableWriter.builder()
.inDirectory(currentOutputDirectory.getAbsolutePath())
.withPartitioner(new Murmur3Partitioner())
.withBufferSizeInMB(128).forTable(schema).using(insert).build();
}
private static final CsvConfiguration createCsvConfig() {
CsvConfiguration config = new CsvConfiguration();
config.setFieldDelimiter(',');
return config;
}
}
关于java - 使用 Java 和 CQLSSTableWriter 将大行数据加载到 Cassandra 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28506947/
我正在尝试根据 Spark 中的批处理计算结果创建 Cassandra SSTable。理想情况下,每个分区都应该为其保存的数据创建 SSTable,以便尽可能并行化进程(甚至可能将其流式传输到 Ca
我正在运行 Cassandra 2.1.0 作为我的客户端,因为 2.0.9 不支持同一个表上的并发写入器,2.0.9 在集群上。 我可以为一个 JVM 实例中的单个 CF 使用并发 CQLSStab
我一直在尝试使用 Java 的 SSTableLoader API 将非常大的数据集加载到 Cassandra 中,但我的程序在执行此操作时始终超时。 我正在将文章分解为单词 ngram(一元词、二元
我是一名优秀的程序员,十分优秀!