gpt4 book ai didi

python - 使用 pyarrow 时 Spark 拒绝创建空数据框

转载 作者:太空宇宙 更新时间:2023-11-03 20:23:23 34 4
gpt4 key购买 nike

我想从现有的 Spark 数据帧中创建一个空数据帧。我使用 pyarrow 支持(在 Spark conf 中启用)。当我尝试从空 RDD 和与现有数据帧相同的架构创建空数据帧时,我得到了 java.lang.NegativeArraySizeException。这是重现错误的完整代码

spark = SparkSession.builder \
.config("spark.sql.execution.arrow.enabled", "true") \
.getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()

这是完整的堆栈跟踪:

/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)

warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
2120 _check_dataframe_localize_timestamps
2121 import pyarrow
-> 2122 batches = self._collectAsArrow()
2123 if len(batches) > 0:
2124 table = pyarrow.Table.from_batches(batches)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
2182 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
2183 finally:
-> 2184 jsocket_auth_server.getResult() # Join serving thread and raise any exceptions
2185
2186 ##########################################################################################

/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

当我禁用 pyarrow 时,错误消失

spark.conf.set("spark.sql.execution.arrow.enabled","false")

这是 pyspark 的已知问题还是与 pyarrow 相关?

注意:此错误仅可通过 pyspark>=2.4.4 重现。

最佳答案

收集 RDD 并根据结果创建 pandas 数据框的问题的解决方法如下:代码中的其他问题是使用 ':' 替换为 ','

from pyspark.sql import SparkSession
import pyarrow as pa
import pandas as pd


spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")

empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
empty_pandas_df = empty_df.collect()
empty_pandas_df = pd.DataFrame(empty_pandas_df)

print(empty_pandas_df)
df.show()

输出

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Empty DataFrame
Columns: []
Index: []
[Stage 2:> (0 + 3) / 3]+---+
|age|
+---+
| 10|
| 11|
| 13|
+---+

关于python - 使用 pyarrow 时 Spark 拒绝创建空数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58014063/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com