gpt4 book ai didi

hadoop - 使用 pyspark 流式传输到 HBase

转载 作者:可可西里 更新时间:2023-11-01 14:25:33 28 4
gpt4 key购买 nike

网上有大量关于使用 Scala 使用 Spark 流批量加载到 HBase 的信息(these two 特别有用)和一些关于 Java 的信息,但似乎缺乏相关信息与 PySpark。所以我的问题是:

  • 如何使用 PySpark 将数据批量加载到 HBase?
  • 大多数示例在任何语言中都只显示每行被更新的一列。如何在每行中插入多列?

我目前的代码如下:

if __name__ == "__main__":

context = SparkContext(appName="PythonHBaseBulkLoader")
streamingContext = StreamingContext(context, 5)

stream = streamingContext.textFileStream("file:///test/input");

stream.foreachRDD(bulk_load)

streamingContext.start()
streamingContext.awaitTermination()

我需要帮助的是批量加载功能

def bulk_load(rdd):
#???

我之前取得了一些进展,但有许多不同的错误(如记录 herehere )

最佳答案

经过多次试验和错误,我在这里展示了我想到的最好的。它运行良好,并成功地批量加载数据(使用 Puts 或 HFiles)我完全愿意相信这不是最好的方法,因此欢迎任何评论/其他答案。这里假设您使用 CSV 格式存储数据。

使用 Puts 批量加载

到目前为止,批量加载最简单的方法是为 CSV 中的每个单元格创建一个 Put 请求,并将它们排队到 HBase。

def bulk_load(rdd):
#Your configuration will likely be different. Insert your own quorum and parent node and table name
conf = {"hbase.zookeeper.qourum": "localhost:2181",\
"zookeeper.znode.parent": "/hbase-unsecure",\
"hbase.mapred.outputtable": "Test",\
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
.flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

csv_to_key_value 函数是魔法发生的地方:

def csv_to_key_value(row):
cols = row.split(",")#Split on commas.
#Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
#Works well for n>=1 columns
result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
(cols[0], [cols[0], "f2", "c2", cols[2]]),
(cols[0], [cols[0], "f3", "c3", cols[3]]))
return result

我们之前定义的值转换器会将这些元组转换为 HBase Put

使用 HFiles 批量加载

使用 HFile 进行批量加载效率更高:不是为每个单元格发出 Put 请求,而是直接写入 HFile,然后简单地告知 RegionServer 指向新的 HFile。这将使用 Py4J,因此在 Python 代码之前我们必须编写一个小的 Java 程序:

import py4j.GatewayServer;
import org.apache.hadoop.hbase.*;

public class GatewayApplication {

public static void main(String[] args)
{
GatewayApplication app = new GatewayApplication();
GatewayServer server = new GatewayServer(app);
server.start();
}
}

编译并运行它。只要您的流式传输正在进行,就让它运行。现在更新 bulk_load 如下:

def bulk_load(rdd):
#The output class changes, everything else stays
conf = {"hbase.zookeeper.qourum": "localhost:2181",\
"zookeeper.znode.parent": "/hbase-unsecure",\
"hbase.mapred.outputtable": "Test",\
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}

keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
.flatMap(csv_to_key_value)\
.sortByKey(True)
#Don't process empty RDDs
if not load_rdd.isEmpty():
#saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
"org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
#The file has now been written, but HBase doesn't know about it

#Get a link to Py4J
gateway = JavaGateway()
#Convert conf to a fully fledged Configuration type
config = dict_to_conf(conf)
#Set up our HTable
htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
#Set up our path
path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
#Get a bulk loader
loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
#Load the HFile
loader.doBulkLoad(path, htable)
else:
print("Nothing to process")

最后,相当简单的 dict_to_conf:

def dict_to_conf(conf):
gateway = JavaGateway()
config = gateway.jvm.org.apache.hadoop.conf.Configuration()
keys = conf.keys()
vals = conf.values()
for i in range(len(keys)):
config.set(keys[i], vals[i])
return config

如您所见,使用 HFile 进行批量加载比使用 Put 更复杂,但根据您的数据加载,这可能是值得的,因为一旦您开始使用它,它就不那么困难了。

关于让我措手不及的最后一点注意事项:HFiles 期望它们接收到的数据按词汇顺序写入。这并不总是保证是真的,尤其是因为“10”<“9”。如果您将 key 设计为唯一的,则可以轻松解决此问题:

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
.flatMap(csv_to_key_value)\
.sortByKey(True)#Sort in ascending order

关于hadoop - 使用 pyspark 流式传输到 HBase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35077986/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com