gpt4 book ai didi

java - SPARK 转 HBase 写入

转载 作者:可可西里 更新时间:2023-11-01 15:57:14 26 4
gpt4 key购买 nike

我的SPARK程序中的流程如下:

Driver --> Hbase connection created --> Broadcast the Hbase handle现在从执行者那里,我们获取这个句柄并尝试写入 hbase

在驱动程序中,我正在创建 HBase conf 对象和连接对象,然后通过 JavaSPARK 上下文广播它,如下所示:

     SparkConf sparkConf = JobConfigHelper.getSparkConfig();

Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);

jsc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(Long.parseLong(batchDuration)));

Configuration hconf=HBaseConfiguration.create();
hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
UserGroupInformation.setConfiguration(hconf);

JavaSparkContext js = jsc.sparkContext();
Connection connection = ConnectionFactory.createConnection(hconf);
connectionbroadcast=js.broadcast(connection);

执行器的内部 call() 方法,

Table table = connectionbroadcast.getValue().getTable(TableName.valueOf("gfttsdgn:FRESHHBaseRushi")) ;

Put p = new Put(Bytes.toBytes("row1"));

p.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan"));
table.put(p);

尝试在 yarn-client 模式下运行时出现以下异常:

17/03/02 09:19:38 ERROR yarn.ApplicationMaster: User class threw exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (org.apache.hadoop.conf.Configuration)
conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory)
rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess)
asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation)
com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classLoader (org.apache.hadoop.conf.Configuration)
conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory)
rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess)
asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1337)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639)
at com.citi.fresh.core.driver.FreshDriver.main(FreshDriver.java:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 28 more

最佳答案

我看到您正在尝试使用 Spark 将数据批量放入 HBase。正如@jojo_Berlin 所解释的,您的 Hbase Conf 不是线程安全的。但是,您可以使用 SparkOnHbase 轻松实现此目的。 .

Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkPut(rdd, TableName.valueOf("gfttsdgn:FRESHHBaseRushi"), new PutFunction(), true);

你的'put'函数在哪里:

public static class PutFunction implements Function<String, Put> {
public Put call(String v) throws Exception {
Put put = new Put(Bytes.toBytes(v));
put.add(Bytes.toBytes("c1"), Bytes.toBytes("output"),
Bytes.toBytes("rohan"));
return put;
}
}

关于java - SPARK 转 HBase 写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42558798/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com