gpt4 book ai didi

cassandra - Apache Spark 与 Cassandra 行为

转载 作者:行者123 更新时间:2023-12-03 06:54:17 25 4
gpt4 key购买 nike

我正在编写一个独立的 Spark 程序,它从 Cassandra 获取数据。我按照示例并通过 newAPIHadoopRDD() 和 ColumnFamilyInputFormat 类创建了 RDD。RDD 已创建,但当我调用 RDD 的 .groupByKey() 方法时,出现 NotSerializedException:

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("Test");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

Job job = new Job();
Configuration jobConf = job.getConfiguration();
job.setInputFormatClass(ColumnFamilyInputFormat.class);

ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputRpcPort(jobConf, port);
ConfigHelper.setOutputInitialAddress(jobConf, host);
ConfigHelper.setOutputRpcPort(jobConf, port);
ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");

SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setFinish(new byte[0]);
sliceRange.setStart(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
spark.newAPIHadoopRDD(jobConf,
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);

JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
System.out.println(groupRdd.count());
}

异常(exception):

java.io.NotSerializableException: java.nio.HeapByteBuffer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662)

我想做的是将所有行键列合并到一个条目中。当我尝试使用reduceByKey()方法时,我也遇到了同样的异常:

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
sortedMap.putAll(arg0);
sortedMap.putAll(arg1);
return sortedMap;
}
}
);

我正在使用:

  • spark-1.0.0-bin-hadoop1
  • Cassandra 1.2.12
  • Java 1.6

有谁知道问题出在哪里吗?序列化失败的原因是什么?

谢谢,

最佳答案

您的问题可能是由于尝试序列化 ByteBuffers 引起的。它们不可序列化,您需要在生成 RDD 之前将它们转换为字节数组。

您应该尝试适用于 Spark 的官方 DataStax Cassandra 驱动程序,该驱动程序可用 here

关于cassandra - Apache Spark 与 Cassandra 行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24411658/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com