gpt4 book ai didi

apache-spark - Spark 只写入一个 hbase 区域服务器

转载 作者:行者123 更新时间:2023-12-01 11:21:33 25 4
gpt4 key购买 nike

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.PairRDDFunctions

def bulkWriteToHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sinkTableName: String, outRDD: RDD[(ImmutableBytesWritable, Put)]): Unit = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sinkTableName)

val hJob = Job.getInstance(hConf)
hJob.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, sinkTableName)
hJob.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

outRDD.saveAsNewAPIHadoopDataset(hJob.getConfiguration())
}

我通过使用这个 hbase 批量插入发现,每次 spark 只会从 hbase 写入一个单一的区域服务器,这成为了瓶颈。

然而,当我使用几乎相同的方法但从 hbase 读取时,它使用多个执行程序进行并行读取。

def bulkReadFromHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sourceTableName: String) = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sourceTableName)

val inputRDD = sparkContext.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
inputRDD
}

can anyone please explain why this could happen? or maybe I have used the wrong way for spark-hbase bulk I/O ?

最佳答案

Question : I have used the wrong way for spark-hbase bulk I/O ?

虽然您的方法不对,但您需要事先预拆分区域并创建包含预拆分区域的表。

例如 create 'test_table', 'f1', SPLITS=> ['1', '2', '3', '4', '5', '6', '7', ' 8', '9']

上表占用9个区域..

用will从1-9开始设计好的rowkey

您可以像下面这样使用 guava murmur hash。

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

/**
* getMurmurHash.
*
* @param content
* @return HashCode
*/
public static HashCode getMurmurHash(String content) {
final HashFunction hf = Hashing.murmur3_128();
final HashCode hc = hf.newHasher().putString(content, Charsets.UTF_8).hash();
return hc;
}

final long hash = getMurmur128Hash(Bytes.toString(yourrowkey as string)).asLong();
final int prefix = Math.abs((int) hash % 9);

现在将此前缀附加到您的行键

For example

1rowkey1 // will go in to first region
2rowkey2 // will go in to second region
3rowkey3 // will go in to third region ... 9rowkey9 // will go in to ninth region

如果您正在进行预拆分,并且想要手动管理区域拆分,您还可以通过将 hbase.hregion.max.filesize 设置为较大的数字并将拆分策略设置为 ConstantSizeRegionSplitPolicy 来禁用区域拆分。但是,您应该使用 100GB 之类的保护值,这样区域的增长就不会超出区域服务器的能力。您可以考虑禁用自动拆分并依赖于预拆分的初始区域集,例如,如果您对键前缀使用统一哈希,并且您可以确保每个区域的读/写负载区域及其大小在表中的区域之间是统一的

1) 请确保在将数据加载到 hbase 表之前可以预拆分表 2) 使用 murmurhash 或其他一些哈希技术设计良好的 rowkey,如下所述。确保跨区域统一分配。
另请参阅 http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/

Question : can anyone please explain why this could happen?

原因非常明显和简单由于该表的 rowkey 不佳而将数据热点定位到一个特定原因...

考虑 java 中的一个 hashmap,它的元素的 hashcode 为 1234。那么它将填充一个桶中的所有元素不是吗?如果 hashmap 元素分布在不同的良好 hashcode 中,那么它将把元素放在不同的桶中。 hbase也是如此。这里你的哈希码就像你的行键......

更进一步,

What happens if I already have a table and I want to split the regions across...

RegionSplitter类提供了几个实用程序来帮助选择手动拆分区域而不是让 HBase 自动处理的开发人员的管理生命周期。

最有用的实用程序是:

  • 创建具有指定数量的预拆分区域的表
  • 对现有表上的所有区域执行滚动拆分

示例:

$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1

其中-c 10,指定请求的region个数为10,-f指定表中你想要的列族,以“:”分隔。该工具将创建一个名为“test_table”的表,其中包含 10 个区域:

13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY => '', ENDKEY => '19999999', ENCODED => acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY => '19999999', ENDKEY => '33333332', ENCODED => 37ec12df6bd0078f5573565af415c91b,}
...

as discussed in comment, you found that my final RDD right before writing into hbase only has 1 partition! which indicates that there was only one executor holding the entire data... I am still trying to find out why.

另外,检查

spark.default.parallelism defaults to the number of all cores on all machines. The parallelize api has no parent RDD to determine the number of partitions, so it uses the spark.default.parallelism.

所以你可以通过重新分区来增加分区。

注意:我观察到,在 Mapreduce 中,区域的分区数量/输入拆分 = 启动的映射器数量。类似地,在您的情况下,数据加载到一个特定区域的情况可能是相同的,这就是为什么一个执行者发射。也请验证一下

关于apache-spark - Spark 只写入一个 hbase 区域服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42030653/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com