- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是 pyspark 的新手,我正在尝试使用 pyspark 在 prophet 中运行多个时间序列(作为分布式计算,因为我有 100 多个时间序列要预测)但我有如下错误。
import time
start_time = time.time()
sdf = spark.createDataFrame(data)
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))
sdf.createOrReplaceTempView('Quantity')
spark.sql("select Reseller_City, Business_Unit, count(*) from Quantity group by Reseller_City, Business_Unit order by Reseller_City, Business_Unit").show()
query = 'SELECT Reseller_City, Business_Unit, conditions, black_week, promos, Sales_Date as ds, sum(Rslr_Sales_Quantity) as y FROM Quantity GROUP BY Reseller_City, Business_Unit, conditions, black_week, promos, ds ORDER BY Reseller_City, Business_Unit, ds'
spark.sql(query).show()
sdf.rdd.getNumPartitions()
store_part = (spark.sql(query).repartition(spark.sparkContext.defaultParallelism['Reseller_City','Business_Unit'])).cache()
store_part.explain()
from pyspark.sql.types import *
result_schema =StructType([
StructField('ds',TimestampType()),
StructField('Reseller_City',StringType()),
StructField('Business_Unit',StringType()),
StructField('y',DoubleType()),
StructField('yhat',DoubleType()),
StructField('yhat_upper',DoubleType()),
StructField('yhat_lower',DoubleType())
])
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_sales( store_pd ):
model = Prophet(interval_width=0.95, holidays = lock_down)
model.add_country_holidays(country_name='DE')
model.add_regressor('conditions')
model.add_regressor('black_week')
model.add_regressor('promos')
train = store_pd[store_pd['ds']<'2021-10-01 00:00:00']
future_pd = store_pd[store_pd['ds']>='2021-10-01 00:00:00']
model.fit(train[['ds', 'y', 'conditions', 'black_week', 'promos']])
forecast_pd = model.predict(future_pd[['ds', 'conditions', 'black_week', 'promos']])
f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
#store_pd = store_pd.filter(store_pd['ds']<'2021-10-01 00:00:00')
st_pd = future_pd[['ds','Reseller_City','Business_Unit','y']].set_index('ds')
results_pd = f_pd.join( st_pd, how='left' )
results_pd.reset_index(level=0, inplace=True)
results_pd[['Reseller_City','Business_Unit']] = future_pd[['Reseller_City','Business_Unit']].iloc[0]
return results_pd[ ['ds', 'Reseller_City','Business_Unit','y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
results = (store_part.groupBy(['Reseller_City','Business_Unit']).apply(forecast_sales).withColumn('training date', current_date() ))
results.cache()
results.show()
所有行都完美执行,但错误来自 results.show() 行我不明白我哪里做错了,如果有人帮助我,我将不胜感激
Py4JJavaError Traceback (most recent call last)
<ipython-input-46-8c647e8bf4d9> in <module>
----> 1 results.show()
C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o128.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 1243, Grogu.profiflitzer.local, executor driver): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.readInt(Unknown Source)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.readInt(Unknown Source)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more
最佳答案
确保你已经设置了 spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
在这里阅读更多:https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html
如果这不起作用,请尝试增加驱动程序和工作程序的内存大小。
关于python - 派斯帕克。 spark.SparkException : Job aborted due to stage failure: Task 0 in stage 15. 0 失败 1 次,java.net.SocketException:连接重置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69973790/
我知道Stackoverflow上有other very similar个问题,但是这些问题没有得到回答或没有帮助我。与这些问题相比,我在此问题中添加了更多的堆栈跟踪和日志文件信息。我希望这会有所帮助
为什么这段代码会产生这个异常?怎样才能避免呢 SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("l
这是一个工作代码示例: JavaPairDStream messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group
我正在使用 YARN 在 Hadoop 集群上运行以下代码。它解析一些电子邮件并执行情感注释,最后将结果 DataFrame 写入 HDFS 上的 Parquet 表。不幸的是,它在 HDFS 上最后
我正在从 HDFS 检查点恢复流(例如,ConstantInputDSTream),但我不断收到 SparkException: has not been initialized . 从检查点恢复时
我正在 EMR 中使用 YARN 作为资源管理器并在 2 个节点上运行 Spark 作业。如果不满足我的条件,我需要故意使该步骤失败,因此下一步不会按照配置执行。 为了实现这一点,我在 dynamoD
我正在尝试使用Oracle虚拟机通过Spark将HDFS [/tmp/chicago_test_load/chicago_crimes_01_present.csv]中的Chicago Crime数据
我正在尝试运行以下简单的 Spark 代码: public static void main(final String[] args)throws Exception { ClassLoade
我正在处理两个 pyspark 数据框,并对它们进行左反连接以跟踪日常更改,然后发送电子邮件。 我第一次尝试: diff = Table_a.join( Table_b, [Table
我正在搜索此错误,但没有找到与 TrainValidationSplit 相关的任何内容。所以我想进行参数调整,并使用 TrainValidationSplit 执行此操作会出现以下错误:org.ap
我试图通过 foreachpartition 将结果添加到 mysql,但收到错误 org.apache.spark.SparkException:任务不可序列化 java。 公共(public)类
我正在尝试运行以下简单的 Spark 代码: Gson gson = new Gson(); JavaRDD stringRdd = jsc.textFile("src/main/resources/
我正在 Hadoop-Yarn 集群上执行 spark-submit 作业。 spark-submit/opt/spark/examples/src/main/python/pi.py 1000 但面
我的 spark 结构化流数据帧需要一个 JDBC 接收器。目前,据我所知,DataFrame 的 API 缺乏 writeStream到 JDBC 实现(既不在 PySpark 也不在 Scala(
我是这个主题的新手,我使用基于推送的方法并且它有效,但不知何故使用基于拉的方法它会引发接收器连接错误。也许我可能会错过一些东西。 Flume配置详情如下 sink.type=org.apache.sp
我的代码如下所示(抱歉,我无法显示完整代码): public class MyClass { final A _field1; // Non-serializable object f
我正在用 java 编写我的第一个 Spark 程序,但无法找出以下错误。我已经解决了很多有关堆栈溢出的问题,但他们认为与我的问题无关。我正在尝试使用最新版本的spark 2.4.4。我正在本地运行我
我有以下三个类(class),我正在学习 Task not serialized 错误。完整的堆栈跟踪见下文。 头等舱是一个序列化的人: public class Person implements
我正在尝试设置 Spark Streaming 以从 Kafka 队列中获取消息。我收到以下错误: py4j.protocol.Py4JJavaError: An error occurred whi
我是 Scala 的新手,我正在尝试执行以下代码: val SetID = udf{(c:String, d: String) => if( c.UpperCase.contains("EXK
我是一名优秀的程序员,十分优秀!