gpt4 book ai didi

scala - Spark with Kafka streaming 保存到 Elastic search 性能低下

转载 作者:行者123 更新时间:2023-11-29 02:52:20 24 4
gpt4 key购买 nike

我有一个数据列表,值基本上是一个 bson 文档(想想 json),每个 json 的大小从 5k 到 20k 不等。可以是bson对象格式,也可以直接转成json:

Key, Value
--------
K1, JSON1
K1, JSON2
K2, JSON3
K2, JSON4

我希望 groupByKey 会产生:

K1, (JSON1, JSON2)
K2, (JSON3, JSON4)

所以当我这样做时:

val data = [...].map(x => (x.Key, x.Value))
val groupedData = data.groupByKey()
groupedData.foreachRDD { rdd =>
//the elements in the rdd here are not really grouped by the Key
}

我对 RDD 的行为感到很困惑。我在互联网上阅读了很多文章,包括 Spark 的官方网站:https://spark.apache.org/docs/0.9.1/scala-programming-guide.html

还是达不到我想要的。

--------更新--------------------

基本上我真的需要按key来分组,key就是要在Elasticsearch中使用的索引,这样我就可以通过Elasticsearch for Hadoop根据key进行批处理:

EsSpark.saveToEs(rdd);

我不能按分区做,因为 Elasticsearch 只接受 RDD。我尝试使用 sc.MakeRDD 或 sc.parallize,两者都告诉我它不可序列化。

我尝试使用:

EsSpark.saveToEs(rdd, Map(
"es.resource.write" -> "{TheKeyFromTheObjectAbove}",
"es.batch.size.bytes" -> "5000000")

配置文件在这里:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

但与不使用配置来根据单个文档的值定义动态索引相比,它非常慢,我怀疑它正在解析每个 json 以动态获取值。

最佳答案

这是例子。

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Test extends App {

val session: SparkSession = SparkSession
.builder.appName("Example")
.config(new SparkConf().setMaster("local[*]"))
.getOrCreate()
val sc = session.sparkContext

import session.implicits._

case class Message(key: String, value: String)

val input: Seq[Message] =
Seq(Message("K1", "foo1"),
Message("K1", "foo2"),
Message("K2", "foo3"),
Message("K2", "foo4"))

val inputRdd: RDD[Message] = sc.parallelize(input)

val intermediate: RDD[(String, String)] =
inputRdd.map(x => (x.key, x.value))
intermediate.toDF().show()
// +---+----+
// | _1| _2|
// +---+----+
// | K1|foo1|
// | K1|foo2|
// | K2|foo3|
// | K2|foo4|
// +---+----+

val output: RDD[(String, List[String])] =
intermediate.groupByKey().map(x => (x._1, x._2.toList))
output.toDF().show()
// +---+------------+
// | _1| _2|
// +---+------------+
// | K1|[foo1, foo2]|
// | K2|[foo3, foo4]|
// +---+------------+

output.foreachPartition(rdd => if (rdd.nonEmpty) {
println(rdd.toList)
})
// List((K1,List(foo1, foo2)))
// List((K2,List(foo3, foo4)))

}

关于scala - Spark with Kafka streaming 保存到 Elastic search 性能低下,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49291315/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com