gpt4 book ai didi

apache-spark - 将单个 DStream 拆分为多个 Hive 表

转载 作者:行者123 更新时间:2023-12-03 15:54:29 24 4
gpt4 key购买 nike

我正在研究 Kafka Spark 流项目。 Spark 流从 Kafka 获取数据。数据为json格式。样本输入

{ "table": "tableA", "Product_ID": "AGSVGF.upf", "file_timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "hdfs_file_name": "null_1532631600050", "Date_Time": "2018-07-26T13:45:01.0000000Z", "User_Name": "UBAHTSD" }

{ "table": "tableB", "Test_ID": "FAGS.upf", "timestamp": "2018-07-26T18:58:08.4485558Z000000000000000", "name": "flink", "time": "2018-07-26T13:45:01.0000000Z", "Id": "UBAHTGADSGSCVDGHASD" }



一个 JSON 字符串就是一条消息。有 15 种类型的 JSON 字符串,它们使用表列来区分。现在我想在 Apache Hive 中保存这 15 个不同的 JSON。所以我创建了一个 dstream 并在表列的基础上过滤了 rdd 并保存到 Hive 中。代码工作正常。但是有些时间很多它表很多时间然后触发批处理。我已经使用 spark.streaming.kafka.maxRatePerPartition=10 控制了输入.我已将 rdd 重新分区为 9 个分区,但在 Spark UI 上,它显示未知阶段。
enter image description here

这是我的代码。
val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val sparkContext = rdd.sparkContext
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val hiveContext = getInstance(sparkContext)
val tableA = rdd.filter(_.contains("tableA"))
if (!tableA.isEmpty()) {
HiveUtil.tableA(hiveContext.read.json(tableA))
tableA.unpersist(true)
}

val tableB = rdd.filter(_.contains("tableB"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableB))
tableB.unpersist(true)
}
.....
.... upto 15 tables
....

val tableK = rdd.filter(_.contains("tableK"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableK))
tableB.unpersist(true)
}

}

}

我如何优化代码?

谢谢你。

最佳答案

纯粹从管理角度来看,我建议您将作业参数化以接受表名,然后运行 ​​15 个单独的 Spark 应用程序。还要保证每个应用的kafka消费组是不同的

这样,您可以更轻松地监控哪个 Spark 作业的性能不如其他作业,并且一个表的数据倾斜不会导致其他表出现问题。

目前尚不清楚 Kafka 消息键是什么,但是如果以表为键生成,那么 Spark 可以与 kafka 分区一起扩展,并且您可以保证每个表的所有消息都是有序的。

总的来说,我实际上会使用 Kafka Connect 或 Streamsets 来写入 HDFS/Hive,而不必编写代码或调整 Spark 设置

关于apache-spark - 将单个 DStream 拆分为多个 Hive 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51579105/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com