gpt4 book ai didi

apache-spark - 如何将多个 ML 管道(模型)应用于同一个 Spark 流

转载 作者:行者123 更新时间:2023-12-05 06:23:06 24 4
gpt4 key购买 nike

我有一个用例,我必须在同一个 spark 流(从 kafka 获取)上应用多个已经训练好的模型(例如 M1、M2、..Mn)。

模型是使用此处的隔离森林算法训练的: https://github.com/titicaca/spark-iforest

我在这里找到了与我的案例类似的东西 https://www.youtube.com/watch?v=EhRHQPCdldI , 但不幸的是我不知道 Genesys 公司(以前的 AltoCloud)是否开源了这个 API(StreamPipeline,Heterogenous Pipeline)。

我用上面的模式代码处理了这个,但我不知道最优是多少。

//read the stream
val kafkaStreamDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "topic")
.load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the same stream, otherwise blocked??
myModels.par.foreach(lm => {

//load the model
val model = PipelineModel.load(lm)

kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame, batchId: Long) =>
//apply model
val pdf = model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS value").write
.format("json")
.save("anom/" + lm + System.currentTimeMillis())
}).start().awaitTermination()
})

问题:1. 因此,我想知道是否有任何 Spark API 可以处理这样的用例?

  1. 如果是,我在哪里可以找到它?

  2. 如果不是,我该如何最佳地实现它?

非常感谢任何想法和建议。

最佳答案

据我所知,它可以像 this 那样完成.... 但是,如果您的处理时间多于接收时间怎么办?消息将堆积起来,这将导致流式接收速度变慢。 iForest使用树结构来建模数据。完成算法需要一些时间。

我更喜欢像 hdfs 分区明智地存储在存储中......并以固定时间间隔的批处理方式在其上应用 ML。这样您就可以毫不延迟地接收消息并有效地处理它们。

关于apache-spark - 如何将多个 ML 管道(模型)应用于同一个 Spark 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58643624/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com