gpt4 book ai didi

hadoop - 使用 map reduce 在 cassandra 中执行批量加载

转载 作者:可可西里 更新时间:2023-11-01 14:32:00 26 4
gpt4 key购买 nike

我没有太多使用 cassandra 的经验,所以如果我采用了错误的方法,请原谅。

我正在尝试使用 map reduce 在 cassandra 中进行批量加载

基本上是字数统计的例子

引用:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/

我已经放置了简单的 Hadoop Wordcount Mapper 示例,并根据上面的示例稍微修改了驱动程序代码和 reducer。

我也成功生成了输出文件。现在我的疑问是如何执行加载到 cassandra 部分?我的方法有什么不同吗?

请指教。

这是驱动代码的一部分

 Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);

Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");

job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;

Mapper 与普通的 wordcount mapper 相同,只是标记化并发出 Word,1

reducer类的形式是

public class CassaWordCountReducer extends 
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}

最佳答案

要对 Cassandra 进行批量加载,我建议查看 this article from DataStax .基本上你需要为批量加载做两件事:

  • 您的输出数据本身不适合 Cassandra,您需要将其转换为 SSTables。
  • 获得 SSTable 后,您需要能够将它们流式传输到 Cassandra 中。当然你不会简单的想把每个SSTable复制到每个节点,你只想复制相关部分的数据到每个节点

在您使用 BulkOutputFormat 的情况下,它应该像在幕后使用 sstableloader 一样执行所有操作。我从未将它与 MultipleOutputs 一起使用,但它应该可以正常工作。

我认为你的情况的错误是你没有正确使用MultipleOutputs:你还在做一个context.write,而你真的应该写到您的 MultipleOutputs 对象。你现在这样做的方式,因为你正在写入常规 Context,它会被 TextOutputFormat 的默认输出格式而不是那个您在 MultipleOutputs 中定义。有关如何在 reducer 中使用 MultipleOutputs 的更多信息 here .

一旦你像你定义的那样写入正确的 BulkOutputFormat 输出格式,你的 SSTables 应该被创建并从集群中的每个节点流式传输到 Cassandra - 你不需要任何额外的步骤,输出格式将为您处理。

我还建议查看 this post ,他们还解释了如何使用 BulkOutputFormat,但他们使用的是 ConfigHelper,您可能想查看它以更轻松地配置您的 Cassandra 端点。

关于hadoop - 使用 map reduce 在 cassandra 中执行批量加载,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14700543/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com