gpt4 book ai didi

exception - 应用程序运行一段时间后Pyspark套接字超时异常

转载 作者:行者123 更新时间:2023-12-02 22:14:14 29 4
gpt4 key购买 nike

我正在使用 pyspark 来估计逻辑回归模型的参数。我使用spark来计算可能性和梯度,然后使用scipy的最小化函数进行优化(L-BFGS-B)。

我使用 yarn 客户端模式来运行我的应用程序。我的应用程序可以毫无问题地开始运行。然而,一段时间后,它报告以下错误:

Traceback (most recent call last):
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
options={'disp': False})
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
callback=callback, **options)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
f, g = func_and_grad(x)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
f = fun(x, *args)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
return function(*(wrapper_args + args))
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
yield self._read_with_length(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
data = self._sock.recv(left)
socket.timeout: timed out

当我将 Spark 日志级别设置为“ALL”时,我还发现了 python Broken Pipe 错误。

我使用的是 Spark 1.6.2 和 Java 1.8.0_91。知道发生了什么吗?

--更新--

我发现这与我在程序中使用的优化例程有关。

我所做的是使用 EM 算法(作为迭代算法)通过最大似然法估计统计模型。在每次迭代期间,我需要通过解决最小化问题来更新参数。 Spark 负责计算我的可能性和梯度,然后将其传递给 Scipy 的最小化例程,我在其中使用 L-BFGS-B 方法。看来这个例程中的某些东西使我的 Spark 工作崩溃了。但我不知道例程的哪一部分导致了这个问题。

另一个观察结果是,在使用相同的示例和相同的程序时,我更改了分区的数量。当分区数量较小时,我的程序可以毫无问题地完成。然而,当分区数量变大时,程序开始崩溃。

最佳答案

我也遇到过类似的问题。我进行了一次迭代,有时执行时间太长以至于超时。增加 Spark.executor.heartbeatInterval 似乎可以解决问题。我将其增加到 3600 秒,以确保我不会再次遇到超时,并且从那时起一切都工作正常。

来自:http://spark.apache.org/docs/latest/configuration.html :

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

关于exception - 应用程序运行一段时间后Pyspark套接字超时异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38417441/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com