- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
有没有办法以矢量化方式(使用pandas_udf?)在pyspark数据帧上运行pytorch模型的推理。
一行 udf 非常慢,因为需要为每一行加载模型 state_dict()。我正在尝试使用 pandas_udf 来加快速度,因为所有操作都可以在 pandas/pytorch 中有效地矢量化。
我已经查看了这个 databricks 帖子以获得灵感,但它与我的用例并不完全对应,因为我想对现有的 pyspark 数据框运行预测。
在这个简单的例子中,我可以使用一行 udf 让它工作:
import torch
import torch.nn as nn
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, udf
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, FloatType, DoubleType
import pandas as pd
import numpy as np
spark = SparkSession.builder.master('local[*]') \
.appName("model_training") \
.getOrCreate()
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.w = nn.Linear(5, 1)
def forward(self, x):
return self.w(x)
net = Net()
bc_model_state = spark.sparkContext.broadcast(net.state_dict())
df = spark.sparkContext.parallelize([[np.random.rand() for i in range(5)] for j in range(10)]).toDF()
df = df.withColumn('features', F.array([F.col(f"_{i}") for i in range(1, 6)]))
def get_model_for_eval():
# Broadcast the model state_dict
net.load_state_dict(bc_model_state.value)
net.eval()
return net
def one_row_predict(x):
model = get_model_for_eval()
t = torch.tensor(x, dtype=torch.float32)
prediction = model(t).cpu().detach().item()
return prediction
one_row_udf = udf(one_row_predict, FloatType())
df = df.withColumn('pred_one_row', one_row_udf(col('features')))
df.show()
+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------+
| _1| _2| _3| _4| _5| features|pred_one_row|
+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------+
| 0.8447505355266759| 0.3938414671838497|0.46347383092447003| 0.7694022276208854| 0.6152606009215115|[0.84475053552667...| 0.025048971|
|0.023782157504950607| 0.6434186254505012| 0.4090423037706754| 0.5466917794921007| 0.7855157903802007|[0.02378215750495...| 0.19694215|
| 0.5057589877333257| 0.7186078182786649| 0.9123361330966105| 0.601837718628886| 0.0773272396167538|[0.50575898773332...| 0.278222|
| 0.2815336141913932| 0.5196112020157087| 0.9646444599173869|0.04844988843812004|0.35445251642633047|[0.28153361419139...| 0.10699606|
| 0.3896101050146765|0.38732747821339863| 0.8516864705178889| 0.2500977280156421| 0.7781221754566505|[0.38961010501467...| -0.08206403|
| 0.8223344715797269| 0.9089425281658239|0.10088026161623431| 0.9920995834835098|0.40665125930441104|[0.82233447157972...| 0.3565607|
| 0.31167413110257425| 0.9778009876605741| 0.4717549025588036|0.24563879994222826| 0.7594244867194454|[0.31167413110257...| 0.18897778|
| 0.5667657426129576| 0.5383639427018171| 0.2983527299596511|0.18914810241640534|0.47854422807435326|[0.56676574261295...| 0.17796803|
| 0.6419824467244137|0.03992370080139418|0.38462617679839173| 0.709487894249459|0.23020927682221126|[0.64198244672441...| 0.15635887|
| 0.7972928622000178| 0.7700992684264264| 0.4387404431803098| 0.1340696629092989| 0.7072213018683782|[0.79729286220001...| 0.0500246|
+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------+
def batch_predict(x):
model = get_model_for_eval()
xp = np.vstack(x)
t = torch.tensor(xp, dtype=torch.float32)
prediction = model(t).cpu().detach().numpy().flatten()
return pd.Series(prediction)
df_pd = df.toPandas()
x = df_pd['features']
print(batch_predict(x))
batch_udf = pandas_udf(batch_predict, FloatType())
df = df.withColumn('pred_batch', batch_udf(col('features')))
df.show()
20/02/11 10:13:01 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
最佳答案
所以显然这个问题是由于 spark 2.4.x 和 pyarrow >= 0.15 之间的不兼容造成的。看这里:
import os
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1'
关于pandas - 如何使用 pandas_udf 在 pyspark 数据帧上运行 pytorch 模型的推理(创建带有预测的新列)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60074543/
现在,我正在使用 MALLET 包中的 LDA 主题建模工具对我的文档进行一些主题检测。最初一切都很好,我从中得到了 20 个主题。但是,当我尝试使用该模型推断新文档时,结果有点莫名其妙。 例如,我故
我正在使用 Jersey 在 Scala 中开发 REST web 服务JAX-RS 引用实现,我收到一个奇怪的错误。 我正在尝试创建一个 ContentDisposition对象使用 Content
以下两个用于计算斐波那契数列第 n 项的 Haskell 程序具有截然不同的性能特征: fib1 n = case n of 0 -> 1 1 -> 1 x -> (fib
所以在来自 another question 的评论中,我刚刚看到了这个计算字符串中 L 数量的例子: "hello".count('l'==) 而且够疯狂……它有效。 从完全扩展的版本开始,我们有:
我在 android 上运行训练有素的 yolov2 网络时遇到问题。我正在使用这个项目进行测试 https://github.com/szaza/android-yolo-v2 . 提供的网络工作正
我目前在我的 iOS 应用程序中使用 Tensorflow 的 Swift 版本。我的模型工作正常,但我无法将数据复制到第一个张量中,因此我可以使用神经网络来检测东西。 我咨询了the testsui
我有一个 SSD tflite 检测模型,正在台式计算机上使用 Python 运行。就目前而言,我的下面的脚本将单个图像作为推理的输入,并且运行良好: # Load TFLite model
我所拥有的:在 Tensorflow 中经过训练的递归神经网络。 我想要的:一个可以尽可能快地运行这个网络的移动应用程序(只有推理模式,没有训练)。 我相信有多种方法可以实现我的目标,但我希望您能提供
**我得到了一些让我的函数成为纯通用函数的建议,这可行,但我更愿意将函数限制为仅接受 Base 及其子项。 在创建可以接受可变模板类基类型参数的函数时遇到问题,而该函数实际上将使用从 Base 派生的
我想使用 TF 2.0 在我的 GPU 集群上运行分布式预测。我使用 MirroredStrategy 训练了一个用 Keras 制作的 CNN 并保存了它。我可以加载模型并在其上使用 .predic
实现一个 C++ 代码来加载一个已经训练好的模型然后获取它而不是使用 Python 真的值得吗? 我想知道这一点,因为据我所知,用于 python 的 Tensorflow 是幕后的 C++(对于 n
我将在网站上提供 pytorch 模型(resnet18)。 然而,在 cpu(amd3600) 中进行推理需要 70% 的 cpu 资源。 我不认为服务器(heroku)可以处理这个计算。 有什么方
为了充分利用 CPU/GPU,我运行了多个对不同数据集进行 DNN 推理(前馈)的进程。由于进程在前馈期间分配了 CUDA 内存,因此我收到了 CUDA 内存不足错误。为了缓解这种情况,我添加了 to
你知道用 1 个 GPU tensorflow 对 2 个 python 进程进行推理的优雅方法吗? 假设我有 2 个进程,第一个是分类猫/狗,第二个是分类鸟/飞机,每个进程运行不同的 tensorf
我是 Scala 的初学者,不明白这里发生了什么: 给定: val reverse:Option[MyObject] = ... 并且myObject.isNaire返回 bool 值。 如果我这样做
我正在尝试通过用我常用的语言 Clojure 实现算法 W 来自学 Hindley-Milner 类型推理。我遇到了 let 推理的问题,我不确定我是否做错了什么,或者我期望的结果是否需要算法之外的东
我正在尝试通过用我常用的语言 Clojure 实现算法 W 来自学 Hindley-Milner 类型推理。我遇到了 let 推理的问题,我不确定我是否做错了什么,或者我期望的结果是否需要算法之外的东
我做了一个项目,基本上使用带有 tensorflow 的 googles object detection api。 我所做的只是使用预训练模型进行推理:这意味着实时对象检测,其中输入是网络摄像头的视
我有一台带有多个 GPU 的服务器,我想在 Java 应用程序内的模型推理期间充分利用它们。默认情况下,tensorflow 占用所有可用的 GPU,但仅使用第一个。 我可以想到三个选项来解决这个问题
这个预测时间190ms,应该是cpu版本 昨天修改了个OpenCV DNN支持部署YOLOv5,6.1版本的Python代码,今天重新转换为C 代码了!貌似帧率比之前涨了点!说明C的确是比Python
我是一名优秀的程序员,十分优秀!