gpt4 book ai didi

hadoop - 映射器与DataStax Cassandra 1.2.1一起无限发展

转载 作者:行者123 更新时间:2023-12-02 21:54:12 28 4
gpt4 key购买 nike

我的cassandra scehma列系列中只有一行。运行mapreduce时,映射器会不断读取同一行。因此,映射器进入无穷大且 reducer 卡住了。

这些是使用的配置

conf.set("fs.default.name", "hdfs://28.151.181.107:9000");
conf.set("mapred.job.tracker", "28.151.181.107:9001");
conf.setJar("C:\\hadoop-test\\demo\\target\\demo-0.0.1-SNAPSHOT.jar");

conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(ReducerToFilesystem.class);
conf.setReducerClass(ReducerToFilesystem.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);

FileOutputFormat.setOutputPath(conf, new Path(resultFileName));

conf.setInputFormat(ColumnFamilyInputFormat.class);

ConfigHelper.setInputRpcPort(conf, PORT + "");
ConfigHelper.setInputInitialAddress(conf, HOST);
ConfigHelper.setInputPartitioner(conf, "RandomPartitioner");
ConfigHelper.setInputColumnFamily(conf, KEY_SPACE, COLUMN_FAMILY,true);
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(conf, predicate);
ConfigHelper.setOutputInitialAddress(conf, HOST);
ConfigHelper.setOutputPartitioner(conf, "RandomPartitioner");

和Mapper&Reducer是
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {

DateSerializer sz = new DateSerializer();
StringSerializer s = new StringSerializer();

for (IColumn col : columns.values()) {
Date name = sz.fromByteBuffer(col.name());

double value = ByteBufferUtil.toDouble(col.value());
paramOutputCollector.collect(new Text(s.fromByteBuffer(key)),
new Text(name.toGMTString() + " [] [] " + value));
}

}


public static class ReducerToFilesystem implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> paramOutputCollector,
Reporter paramReporter) throws IOException {
StringBuffer bfr = new StringBuffer();
while (values.hasNext()) {
Text val = values.next();
bfr.append(val);
bfr.append("<--->");

}

paramOutputCollector.collect(key, new Text(bfr.toString()));

}

请指导。

谢谢您的帮助!

最佳答案

我已经调试了一下,我认为您是对的。即使在1.2.9版本中,分页也无法正确完成。

关于hadoop - 映射器与DataStax Cassandra 1.2.1一起无限发展,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16572235/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com