gpt4 book ai didi

java - 使用来自 hadoop reduce 的复合主键插入到 cassandra 表

转载 作者:可可西里 更新时间:2023-11-01 15:37:11 26 4
gpt4 key购买 nike

我正在使用 Apache Hadoop、MapReduce 和 Cassandra 运行一个 MapReduce 作业,该作业从一个 Cassandra 表中读入,然后输出到另一个 Cassandra 表。

我有一些作业输出到具有单个主键的表。例如,这个用于计算每种单词数量的表有一个键。

    CREATE TABLE word_count(
word text,
count int,
PRIMARY KEY(text)
) WITH COMPACT STORAGE;

关联的 reduce 类看起来有点像这样:

public static class ReducerToCassandra 
extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}

org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());

Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);

ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}

如果我想添加一个额外的列,那么我只需要向 List<Mutation> 添加另一个突变已经由 reduce 输出但我无法弄清楚如何输出到在复合主键中具有新列的表。例如,此表的功能与上面的相同,但还会对单词及其发布时间进行索引。

    CREATE TABLE word_count(
word text,
publication_hour bigint,
count int,
PRIMARY KEY(word, publication_hour)
) WITH COMPACT STORAGE;

我尝试了几种不同的方法,例如尝试输出自定义 WritableComparable (包含一个单词和一个小时)并更新 classmethod签名和 job相应地配置,但这使得reduce扔一个ClassCastException当它尝试转换自定义 WritableComparable 时至 ByteBuffer .

我尝试使用 Builder 构建适当的列名称.

public static class ReducerToCassandra 
// MappedKey MappedValue ReducedKey ReducedValues
extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
// MappedKey Values with the key wordHourPair
public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
long hour = wordHourPair.getHourLong();

org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());

Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);

//New Code
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>();
keyTypes.add(UTF8Type.instance);
keyTypes.add(LongType.instance);
CompositeType compositeKey = CompositeType.getInstance(keyTypes);

Builder builder = new Builder(compositeKey);
builder.add(ByteBufferUtil.bytes(word.toString());
builder.add(ByteBufferUtil.bytes(hour));

ByteBuffer keyByteBuffer = builder.build();
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}

但这会抛出一个 IOException

java.io.IOException: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)

本题:Cassandra CQL3 composite key not written by Hadoop reducer似乎展示了我正在寻找的那种代码,但它调用了 context.write参数类型为 HashMap, ByteBuffer我不确定我会怎么做 context.write接受这些参数。

如何将我想要的数据(字时键、整数值)放入我的表中?

最佳答案

答案是使用 Cassandra 的 CQL 接口(interface),而不是 Thrift API。

现在我可以通过将我的 reduce 类的输出键/值类声明为“Map,List”来写入具有复合键的表,然后为复合键创建一个 Map,其中键(字符串类型)是列名,Value(类型为 ByteBuffer)是使用 ByteBufferUtil 转换为 ByteBuffer 的列值。

例如,写入这样定义的表:

CREATE TABLE foo (
customer_id uuid,
time timestamp,
my_value int,
PRIMARY KEY (customer_id, time)
)

我会写:

String customerID = "the customer's id";
long time = DateTime.now().getMillis();
int myValue = 1;

Map<String, ByteBuffer> key = new Map<String, ByteBuffer>();
key.put("customer_id",ByteBufferUtil.bytes(customerID));
key.put("time",ByteBufferUtil.bytes(time));

List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(myValue));

context.write(key, values);

关于java - 使用来自 hadoop reduce 的复合主键插入到 cassandra 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23395171/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com