- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我无法从可用的 Pyspark 文档中复制 Spark 代码 here.
例如,当我尝试以下与 Grouped Map
有关的代码时:
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
spark.stop()
spark = SparkSession.builder.appName("New_App_grouped_map").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available
pyarrow==0.17.1
pandas==1.0.4
numpy==1.18.4
C:\spark\
中下载了 spark文件夹,所以我不确定是否必须移动
pyarrow
我全局安装到 spark 文件夹中的软件包。是这个问题吗?
>>> df.groupby("id").apply(subtract_mean).show()
[Stage 16:======================================================>(99 + 1) / 100]20/05/
30 16:57:17 ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 577, in main
File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 837, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonExc
eption(PythonRunner.scala:484)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(Python
ArrowOutput.scala:99)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(Python
ArrowOutput.scala:49)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonR
unner.scala:437)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:
37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorF
orCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowItera
tor.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeS
tageCodegenExec.scala:726)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPl
an.scala:321)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala
:872)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala
:441)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExec
utor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available
at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR ArrowPythonRunner: This may have been caused by a prior except
ion:
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.
<init>(long, int) not available
at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR Executor: Exception in task 44.0 in stage 16.0 (TID 159)
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.
<init>(long, int) not available
at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
20/05/30 16:57:17 ERROR TaskSetManager: Task 44 in stage 16.0 failed 1 times; aborting
job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\spark\python\pyspark\sql\dataframe.py", line 407, in show
print(self._jdf.showString(n, 20, vertical))
File "C:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1286, in
__call__
File "C:\spark\python\pyspark\sql\utils.py", line 98, in deco
return f(*a, **kw)
File "C:\spark\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 328, in get_
return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o170.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage
16.0 failed 1 times, most recent failure: Lost task 44.0 in stage 16.0 (TID 159, DESKT
OP-ASG768U, executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe
or java.nio.DirectByteBuffer.<init>(long, int) not available
at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGSche
duler.scala:1989)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.
scala:1977)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGSc
heduler.scala:1976)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGS
cheduler.scala:956)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adap
ted(DAGScheduler.scala:956)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.sc
ala:956)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGSche
duler.scala:2206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedu
ler.scala:2155)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedu
ler.scala:2144)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:431)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:
47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(
SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecu
tion.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecutio
n.scala:87)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Meth
od)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethod
AccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Delegati
ngMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct
ByteBuffer.<init>(long, int) not available
at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.jav
a:473)
at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(Arro
wRecordBatch.java:222)
at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSeri
alizer.java:240)
at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:1
32)
at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$wr
iteIteratorToStream$1(ArrowPythonRunner.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIterat
orToStream(ArrowPythonRunner.scala:101)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(Py
thonRunner.scala:373)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.
scala:213)
最佳答案
Spark 3.0 默认使用 Java 11。有一个关于 Arrow 与 PySpark 集成的已知问题,PySpark 正用于 Pandas UDF。如果您不想降级到 Java 8,可以按照以下说明进行操作。
由于您在本地机器上使用 PySpark,因此您需要转到
$SPARK_HOME/conf/spark-defaults.conf.template
在您的情况下,它将是
C:\Spark\conf\spark-defaults.conf.template
.
spark-defaults.conf
)并在文件底部添加以下内容
spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
启动 PySpark 时,转到 spark UI(通常是
localhost:4040
并查找“环境”选项卡。在“Spark 属性”下,您应该会看到列出的两个选项。
--conf
传递启动 PySpark 时的参数,但我发现将其作为默认选项更容易。
关于apache-spark - Pyspark pandas_udf 文档代码的错误 :'java.lang.UnsupportedOperationException',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62109276/
我在使用以下代码时遇到 pandas_udf 错误。代码是基于另一列创建具有数据类型的列。相同的代码适用于正常较慢的 udf(已注释掉)。 基本上任何更复杂的“字符串”+数据都会返回错误。 # fro
我面临着繁重的数据转换。简而言之,我有数据列,每个数据列都包含与一些序数相对应的字符串。例如,高、中和低。我的目标是将这些字符串映射到整数以保留顺序。在本例中,为LOW -> 0、MID -> 1 和
我正在尝试制作一个 pandas UDF,它接受两列整数值,并根据这些值之间的差异返回一个小数数组,其长度等于上述差异。 到目前为止,这是我的尝试,我一直在尝试各种不同的方法来让它发挥作用,但这是总体
我正在附加到 AWS EMR 实例的 jupyter notebook 上尝试一些与 pyspark 相关的实验。我有一个 spark 数据框,它从 s3 读取数据,然后过滤掉一些东西。使用 df1.
我开始在本地玩 Spark 并发现这个奇怪的问题 1) pip install pyspark==2.3.1 2)pyspark> 将 Pandas 导入为 pd 从 pyspark.sql.func
This answer很好地解释了如何使用 pyspark 的 groupby 和 pandas_udf 进行自定义聚合。但是,我不可能像示例的这一部分所示那样手动声明我的架构 from pyspar
我正在使用 pandas_udf 在我的 Spark 集群上应用机器学习模型,并且有兴趣预定义通过箭头发送到 UDF 的最小记录数。 我遵循了大部分 UDF 的 databricks 教程... ht
我创建了一个 Pandas UDF,它将输入一个数据帧,在 Primary_Key 和 Predictions 上预测并输出一个数据帧。 schema = StructType([StructFiel
我写了一个UDF。它非常慢。我想用 pandas_udf 替换它以利用矢量化。 实际的 udf 有点复杂,但我创建了一个简化的玩具版本。 我的问题:是否可以将玩具示例中的 UDF 替换为可以利用矢量化
我已经测试过 logger和 print无法在 pandas_udf 中打印消息,无论是集群模式还是客户端模式。 测试代码: import sys import numpy as np import
我有这个 df: df = spark.createDataFrame( [('row_a', 5.0, 0.0, 11.0), ('row_b', 3394.0, 0.0, 454
我在 Jupyter 笔记本中运行以下代码,但出现 ImportError。请注意,“udf”可以导入到 Jupyter 中。 从 pyspark.sql.functions 导入 pandas_ud
我有这个 df: df = spark.createDataFrame( [('row_a', 5.0, 0.0, 11.0), ('row_b', 3394.0, 0.0, 454
可以使用外部库,例如 textdistance在pandas_udf里面?我已经尝试过,但收到此错误: ValueError: The truth value of a Series is ambig
我目前正在使用 PySpark 开发我的第一个完整系统,我遇到了一些奇怪的、与内存相关的问题。在其中一个阶段,我想类似于 Split-Apply-Combine 策略以修改 DataFrame。也就是
我正在使用 PySpark 的新 pandas_udf 装饰器,我试图让它将多个列作为输入并返回一个系列作为输入,但是,我收到一个 TypeError : 无效参数 示例代码 @pandas_udf(
我正在使用 PySpark 的新 pandas_udf 装饰器,我试图让它将多个列作为输入并返回一个系列作为输入,但是,我收到一个 TypeError : 无效参数 示例代码 @pandas_udf(
我无法从可用的 Pyspark 文档中复制 Spark 代码 here. 例如,当我尝试以下与 Grouped Map 有关的代码时: import numpy as np import pandas
我正在尝试将函数应用于 pyspark 中的每个数据集组。我遇到的第一个错误是 Py4JError: An error occurred while calling o62.__getnewargs_
我正在构建多个 Prophet 模型,其中每个模型都传递给 pandas_udf 函数,该函数训练模型并使用 MLflow 存储结果。 @pandas_udf(result_schema, Panda
我是一名优秀的程序员,十分优秀!