gpt4 book ai didi

python - 如何在流中应用 MLFlow 预测模型?

转载 作者:行者123 更新时间:2023-12-05 06:01:18 31 4
gpt4 key购买 nike

我有一个流读取准备将特征数据发布到一个已经注册的模型中。所有代码都在 Python 中。以下模型和元数据在常规笔记本中的流之外运行。在流中是另一回事。主要问题是从流中写入的数据(写入目标表)具有 NULL 预测。另一件事是 foreachBatch 函数似乎没有响应,甚至对故意植入的语法错误也是如此。日志或笔记本反馈中没有迹象表明这是一个问题。就好像没有被调用一样。

我意识到我正在向表写入两次(一次在函数中,一次在 writeStream 中。只有一条记录来自 writeStream - 而不是功能。

代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("MyTest") \
.getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream \
.format("delta") \
.option('ignoreDeletes','true') \
.table("schema.transformeddata")

fixedValueStream = lines.select('feature1','feature2', 'feature3')


# Split the lines into words
def batchpredictions(df, epoch_id):

pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1','feature2','feature3')))
prediction.write.mode("append").saveAsTable("schema.transformeddata_prediction")


fixedValueStream.writeStream.format("delta").outputMode("append").foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("schema.transformeddata_prediction")

传入数据:

feature1, feature2, feature3
1 , 5 , 9
2 , 6 , 10
3 , 7 , 11
4 , 8 , 12

传出数据

feature1, feature2, feature3, prediction
1 , 5 , 9 , NULL
2 , 6 , 10 , NULL
3 , 7 , 11 , NULL
4 , 8 , 12 , NULL

关于我做错了什么的任何线索?

*更新:感谢 Mike 的回复。我的目标是使用您建议的一些内容开始优化下面的解决方案。现在我只需要找到一些可以工作的东西来依靠。当前状态下的解决方案如下。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
import mlflow
import mlflow.xgboost
import xgboost
import numpy as np
import pandas as pd
from pyspark.sql.types import *

# Load model as a PysparkUDF
loaded_model = mlflow.pyfunc.load_model('runs:/<mymodelrunid>/model')

spark = SparkSession \
.builder \
.appName("MyTest") \
.getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream \
.format("delta") \
.option('ignoreDeletes','true') \
.table("<myschema>.<mytableinput>")

fixedValueStream = lines.select('feature1','feature2', 'feature3', 'feature4', 'feature5')

def foreach_batch_function(df, epoch_id):
#text value of the multi class prediction GREEN, RED, BLUE
df = df.withColumn("pred_class", lit(' '))

#Prepare 3 holders for the 3 class scores returned from multiclass model.
#Done before hand so I don't have to deal with data type/additional column index/key issues.
df = df.withColumn("prediction_class1", lit(0.00).cast("double"))
df = df.withColumn("prediction_class2", lit(0.00).cast("double"))
df = df.withColumn("prediction_class3", lit(0.00).cast("double"))

#Select back into pandas frame
pd_df = df.select('feature1','feature2', 'feature3', 'feature4', 'feature5','pred_class','prediction_class1','prediction_class2','prediction_class3').toPandas()

#Pass pandas frame into model and return array of shape [<batch-df-rows-count>][3]
y_pred = loaded_model.predict(pd_df)

#Retun the max column score
predicted_idx = np.argmax(y_pred, axis=1)

#Translate said column into end user labels
y_pred_class = np.where(predicted_idx == 1, 'GREEN', np.where(predicted_idx == 0, 'RED', 'BLUE' ))

#Assign class to place holder column
pd_df["pred_class"] = y_pred_class

#Store the 3 prediction strengths into place holder columns
pd_df["prediction_class1"] = y_pred[:,0]
pd_df["prediction_class2"] = y_pred[:,1]
pd_df["prediction_class3"] = y_pred[:,2]

#Write out back to a monitoring table
result = spark.createDataFrame(pd_df)
result.write.option("mergeSchema","true").format("delta").option("header", "true").mode("append").saveAsTable("<myschema>.<mytableoutput>")

#write stream out
fixedValueStream.writeStream.foreachBatch(foreach_batch_function).start()

最佳答案

正如@AlexOtt 在评论中指出的那样,就您当前编写的问题而言,无需应用 foreachBatch

您需要做的就是使用 withColumn 将 UDF 应用到您的流数据帧。


如果您确实需要使用 foreachBatch,可能是因为您正在写入不可流式接收器格式,您可以阅读下面的操作方法。

查看 Structured Streaming Programming Guidelines 中关于 foreachBatch 的文档,您不需要在最终的 writeStream 中使用 formatoutputMode。相反,写入数据的逻辑是在 foreachBatch 函数中定义的。此外,在流中使用 saveAsTable 看起来也不正确。

总体而言,您的代码应如下所示:

def batchpredictions(df, epoch_id):
# Split the lines into words
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1', 'feature2', 'feature3')))
prediction.write.mode("append").format("delta").save("/tmp/delta-table")


fixedValueStream.writeStream.foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start()

关于python - 如何在流中应用 MLFlow 预测模型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67269002/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com