- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个流读取准备将特征数据发布到一个已经注册的模型中。所有代码都在 Python 中。以下模型和元数据在常规笔记本中的流之外运行。在流中是另一回事。主要问题是从流中写入的数据(写入目标表)具有 NULL 预测。另一件事是 foreachBatch
函数似乎没有响应,甚至对故意植入的语法错误也是如此。日志或笔记本反馈中没有迹象表明这是一个问题。就好像没有被调用一样。
我意识到我正在向表写入两次(一次在函数中,一次在 writeStream
中。只有一条记录来自 writeStream
- 而不是功能。
代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("MyTest") \
.getOrCreate()
# Create a streaming DataFrame
lines = spark.readStream \
.format("delta") \
.option('ignoreDeletes','true') \
.table("schema.transformeddata")
fixedValueStream = lines.select('feature1','feature2', 'feature3')
# Split the lines into words
def batchpredictions(df, epoch_id):
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1','feature2','feature3')))
prediction.write.mode("append").saveAsTable("schema.transformeddata_prediction")
fixedValueStream.writeStream.format("delta").outputMode("append").foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("schema.transformeddata_prediction")
传入数据:
feature1, feature2, feature3
1 , 5 , 9
2 , 6 , 10
3 , 7 , 11
4 , 8 , 12
传出数据
feature1, feature2, feature3, prediction
1 , 5 , 9 , NULL
2 , 6 , 10 , NULL
3 , 7 , 11 , NULL
4 , 8 , 12 , NULL
关于我做错了什么的任何线索?
*更新:感谢 Mike 的回复。我的目标是使用您建议的一些内容开始优化下面的解决方案。现在我只需要找到一些可以工作的东西来依靠。当前状态下的解决方案如下。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
import mlflow
import mlflow.xgboost
import xgboost
import numpy as np
import pandas as pd
from pyspark.sql.types import *
# Load model as a PysparkUDF
loaded_model = mlflow.pyfunc.load_model('runs:/<mymodelrunid>/model')
spark = SparkSession \
.builder \
.appName("MyTest") \
.getOrCreate()
# Create a streaming DataFrame
lines = spark.readStream \
.format("delta") \
.option('ignoreDeletes','true') \
.table("<myschema>.<mytableinput>")
fixedValueStream = lines.select('feature1','feature2', 'feature3', 'feature4', 'feature5')
def foreach_batch_function(df, epoch_id):
#text value of the multi class prediction GREEN, RED, BLUE
df = df.withColumn("pred_class", lit(' '))
#Prepare 3 holders for the 3 class scores returned from multiclass model.
#Done before hand so I don't have to deal with data type/additional column index/key issues.
df = df.withColumn("prediction_class1", lit(0.00).cast("double"))
df = df.withColumn("prediction_class2", lit(0.00).cast("double"))
df = df.withColumn("prediction_class3", lit(0.00).cast("double"))
#Select back into pandas frame
pd_df = df.select('feature1','feature2', 'feature3', 'feature4', 'feature5','pred_class','prediction_class1','prediction_class2','prediction_class3').toPandas()
#Pass pandas frame into model and return array of shape [<batch-df-rows-count>][3]
y_pred = loaded_model.predict(pd_df)
#Retun the max column score
predicted_idx = np.argmax(y_pred, axis=1)
#Translate said column into end user labels
y_pred_class = np.where(predicted_idx == 1, 'GREEN', np.where(predicted_idx == 0, 'RED', 'BLUE' ))
#Assign class to place holder column
pd_df["pred_class"] = y_pred_class
#Store the 3 prediction strengths into place holder columns
pd_df["prediction_class1"] = y_pred[:,0]
pd_df["prediction_class2"] = y_pred[:,1]
pd_df["prediction_class3"] = y_pred[:,2]
#Write out back to a monitoring table
result = spark.createDataFrame(pd_df)
result.write.option("mergeSchema","true").format("delta").option("header", "true").mode("append").saveAsTable("<myschema>.<mytableoutput>")
#write stream out
fixedValueStream.writeStream.foreachBatch(foreach_batch_function).start()
最佳答案
正如@AlexOtt 在评论中指出的那样,就您当前编写的问题而言,无需应用 foreachBatch
。
您需要做的就是使用 withColumn
将 UDF 应用到您的流数据帧。
如果您确实需要使用 foreachBatch
,可能是因为您正在写入不可流式接收器格式,您可以阅读下面的操作方法。
查看 Structured Streaming Programming Guidelines 中关于 foreachBatch
的文档,您不需要在最终的 writeStream 中使用 format
和 outputMode
。相反,写入数据的逻辑是在 foreachBatch 函数中定义的。此外,在流中使用 saveAsTable
看起来也不正确。
总体而言,您的代码应如下所示:
def batchpredictions(df, epoch_id):
# Split the lines into words
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1', 'feature2', 'feature3')))
prediction.write.mode("append").format("delta").save("/tmp/delta-table")
fixedValueStream.writeStream.foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start()
关于python - 如何在流中应用 MLFlow 预测模型?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67269002/
我想在不使用 mlflow ui 的情况下向体验笔记添加文本 mlflow UI notes example 我在文档中找不到该方法 https://mlflow.org/docs/latest/tr
我目前正在跟踪我的 MLflow 运行到本地文件路径 URI。我还想设置一个远程跟踪服务器与我的合作者共享。我想避免的一件事是将所有内容都记录到服务器,因为它可能很快就会被失败的运行淹没。 理想情况下
MLFlow Tracking 非常适合监控实验,但我想知道 MLFlow 或其他开源平台上是否有解决方案可以集成来监控数据和模型漂移。 有一个post来自 Databricks 展示了如何使用 De
有没有办法在 MLflow 的实验级别管理权限?我们想要一个共享服务器,但希望能够在实验级别管理权限 - 例如管理员可以查看所有实验,user_group1 可以管理实验 1 - 也许不同的组可以查看
我还没有找到在第一次 start_run 之后为该运行设置运行名称的方法(我们可以在那里传递一个名称)。 我知道我们可以使用标签,但那不是一回事。我想添加一个与运行相关的名称,但通常我们只有在运行评估
我发现删除了 run仅从 active 更改状态至 deleted ,因为如果通过 deleted 搜索,运行仍然在 UI 中可见。 . 是否可以删除 run从UI节省空间? 删除运行时,是否也删除了
我正在使用 mlflow REST API 编写一个库。我正在寻找用于记录不同 mlflow 模型的 mlflow REST api。 在文档中,https://www.mlflow.org/docs
我正在尝试在远程计算机上将 MLFlow 跟踪服务器设置为 systemd 服务。我有一个运行的 sftp 服务器并创建了一个 SSH key 对。 除了工件日志记录之外,一切似乎都运行良好。 MLF
我尝试以这种方式读取指标: data, info = mlflow.get_run(run_id) print(data[1].metrics) # example of output: {'l
我可以使用以下命令创建 ml 模型服务器 mlflow models serve -m file:///C:/Users/SawarkarFamily/Desktop/mlflow-master/ex
我正在尝试在本地网络中的另一台机器上运行 MLFlow,我想寻求一些帮助,因为我现在不知道该怎么做。 我有一个运行在服务器上的 mlflow 服务器。 mlflow 服务器在服务器上的我的用户下运行,
我正在尝试使用 MLFlow CLI 运行 MLFlow 项目,但按照教程进行操作会导致错误。对于我尝试从 CLI 运行的任何项目,我都会收到以下错误 Traceback (most recent c
我正在尝试在本地运行 mlflow 服务器。 为此,我正在使用: mlflow server --backend-store-uri="sqlite:///C:\\path\\to\\project_
我正在尝试将基于 tensorflow (keras) 的模型实现到 mlflow 中,同时了解它的工作原理以及它是否满足我们的需求。我正在尝试从 tensorflow 网站实现 Fashion MN
我已使用 MLProject 将我的模型存储在 Github 中,以便其他人可以以各种变体运行它。现在我想将运行创建的模型记录为工件,以便用户可以使用 MLModel 加载它们。由于我的模型是定制的,
我有一个流读取准备将特征数据发布到一个已经注册的模型中。所有代码都在 Python 中。以下模型和元数据在常规笔记本中的流之外运行。在流中是另一回事。主要问题是从流中写入的数据(写入目标表)具有 NU
MLflow 运行完成后,外部脚本可以使用 python mlflow 客户端和 mlflow.get_run(run_id) 方法访问其参数和指标,但是 get_run 返回的 Run 对象似乎是只
我希望通过在 keras 模型的训练/测试阶段的每个时期之后存储相应指标的每个值,将损失和准确性等指标视为图表。 PS:我知道我们可以通过使用 mlflow for keras 的自动日志功能来做到这
我想从不同的文件夹运行 mlflow ui。默认情况下,它会在我的用户文件夹中创建一个名为“mlruns”的文件夹。如果创建在使用特定工作目录的 Jupiter-Notebook 中运行,则会在该目录
当我将我的整个模型和参数记录到 mlflow 中时,我认为将其置于用户名和密码下进行保护是个好主意。 我使用以下代码运行mlflow服务器 mlflow server --host 0.0.0.0 -
我是一名优秀的程序员,十分优秀!