gpt4 book ai didi

java - Spark序列化错误: When I insert Spark Stream data into HBase

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:12:34 26 4
gpt4 key购买 nike

我对 spark 在数据格式方面如何与 HBase 交互感到困惑。例如,当我在以下代码片段中省略“错误”行时,它运行良好......但是添加该行时,我发现了与序列化问题相关的“任务不可序列化”错误。

如何更改代码?错误发生的原因是什么?

我的代码如下:

// HBase 
Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf, Bytes.toBytes(tableName));

// KAFKA configuration
Set<String> topics = Collections.singleton(topic);

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect", "localhost:2222");
kafkaParams.put("group.id", "tag_topic_id");

//Spark Stream
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics );

JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});

JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {

@Override
public Iterator<String> call(String x) throws IOException {

////////////// Put into HBase : ERROR /////////////////////
String[] data = x.split(",");

if (null != data && data.length > 2 ){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String ts = sdf.format(new Date());

Put put = new Put(Bytes.toBytes(ts));

put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));

htable.put(put); // ***** ERROR ********
htable.close();
}
return Arrays.asList(COLDELIM.split(x)).iterator();
}

});

records.print();

ssc.start();
ssc.awaitTermination();

当我启动我的应用程序时,我遇到了以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.flatMap(DStream.scala:553)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.flatMap(JavaDStreamLike.scala:172)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.flatMap(JavaDStreamLike.scala:42)

Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)

最佳答案

这里有序列化调试器的提示

Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)

将下面的部分放在 FlatMapFunction 之前调用方法(闭包)之前你正在使用它,这应该可以解决问题

Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf, Bytes.toBytes(tableName));

关于java - Spark序列化错误: When I insert Spark Stream data into HBase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41756332/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com