gpt4 book ai didi

scala - Spark 结构化流与 Hbase 集成

转载 作者:行者123 更新时间:2023-12-03 16:31:57 27 4
gpt4 key购买 nike

我们正在对从 MySQL 收集的 kafka 数据进行流式处理。现在,一旦完成所有分析,我想将我的数据直接保存到 Hbase。我已经通过了 spark 结构化的流文档,但找不到任何带有 Hbase 的接收器。我用来从 Kafka 读取数据的代码如下。

 val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
val uschema = StructType(Seq(
StructField("MeterNumber", StringType, true),
StructField("Utility", StringType, true),
StructField("VendorServiceNumber", StringType, true),
StructField("VendorName", StringType, true),
StructField("SiteNumber", StringType, true),
StructField("SiteName", StringType, true),
StructField("Location", StringType, true),
StructField("timestamp", LongType, true),
StructField("power", DoubleType, true)
))
val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

现在终于,我想在 hbase 中保存 DF_Hbase 数据帧。

最佳答案

1-将这些库添加到您的项目中:

      "org.apache.hbase" % "hbase-client" % "2.0.1"
"org.apache.hbase" % "hbase-common" % "2.0.1"

2-将此特征添加到您的代码中:
   import java.util.concurrent.ExecutorService
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.security.User
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.ForeachWriter

trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {

val tableName: String
val hbaseConfResources: Seq[String]

def pool: Option[ExecutorService] = None

def user: Option[User] = None

private var hTable: Table = _
private var connection: Connection = _


override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}

def createConnection(): Connection = {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfResources.foreach(hbaseConfig.addResource)
ConnectionFactory.createConnection(hbaseConfig, pool.orNull, user.orNull)

}

def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(tableName))
}

override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}

override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}

def toPut(record: RECORD): Put

}

3-将它用于您的逻辑:
    val ds = .... //anyDataset[WhatEverYourDataType]

val query = ds.writeStream
.foreach(new HBaseForeachWriter[WhatEverYourDataType] {
override val tableName: String = "hbase-table-name"
//your cluster files, i assume here it is in resources
override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml")

override def toPut(record: WhatEverYourDataType): Put = {
val key = .....
val columnFamaliyName : String = ....
val columnName : String = ....
val columnValue = ....

val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),
Bytes.toBytes(columnName),
Bytes.toBytes(columnValue))

p
}

}
).start()

query.awaitTermination()

关于scala - Spark 结构化流与 Hbase 集成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47152015/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com