gpt4 book ai didi

Elasticsearch 到 Spark Streaming

转载 作者:行者123 更新时间:2023-12-04 03:44:32 25 4
gpt4 key购买 nike

我正在分析日志,我有这样的架构:

kafka-> Spark 流-> Elasticsearch

我的主要目标是在流媒体中创建机器学习模型。我认为我可以做两件事:

1) Kafka->spark Streaming (ML) -> Elasticsearch

2) Kafka->spark Streaming->elasticsearch->spark Streaming(ML)

- 我认为第二种架构是最好的,因为 Spark 流将直接使用索引数据。你怎么认为?那是对的吗?
- 我们可以轻松地将 Spark 流实时连接到 elasticsearch 吗?
-如果我们在 Spark 流中创建模型(在 Elasticsearch 之后)我们必须在这个地方使用这个模型(在 Elasticsearch 之后)还是我们可以在 Spark 流中使用它(kafka之后的目录)? #use==实时预测
- 在 Elasticsearch 使我们的模型静态之后创建模型(或不是实时方法)

谢谢你。

最佳答案

你是那个意思?

卡夫卡-> Spark 流-> Elasticsearch 数据库

val sqlContext = new SQLContext(sc)

//kafka group
val group_id = "receiveScanner"
// kafka topic
val topic = Map("testStreaming"-> 1)
// zk connect
val zkParams = Map(
"zookeeper.connect" ->"localhost",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> group_id)

// Kafka
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
val receiveData = kafkaConsumer.map(_._2 )
// printer kafka data
receiveData.print()
receiveData.foreachRDD{ rdd=>
val transform = rdd.map{ line =>
val data = Json.parse(line)
// play json parse
val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
val name = ( data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
val address = ( data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" }
Row(id,name,age,address)
}

val transfromrecive = sqlContext.createDataFrame(transform,schameType)
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
//filter age < 20 , to ES database
transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
.saveToEs("member/user",Map("es.mapping.id" -> "id"))
}

}

/**
* 数据帧方案
* */
def schameType =  StructType(
StructField("id",IntegerType,false)::
StructField("name",StringType,false)::
StructField("age",IntegerType,false)::
StructField("address",StringType,false)::
Nil
)

关于Elasticsearch 到 Spark Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43889053/

25 4 0