我用pyspark来估计logistic回归模型的参数。我使用spark计算似然和梯度,然后使用scipy的最小化函数进行优化(l-bfgs-b)。
我使用客户机模式运行我的应用程序。我的应用程序可以毫无问题地开始运行。但是,一段时间后,它会报告以下错误:
Traceback (most recent call last):
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
options={'disp': False})
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
callback=callback,**options)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
f, g = func_and_grad(x)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
f = fun(x, *args)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
return function(*(wrapper_args + args))
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
yield self._read_with_length(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
data = self._sock.recv(left)
socket.timeout: timed out
我还发现 python broken pipe
将spark log level设置为“all”时出错。
我使用的是spark 1.6.2和Java1.8.0\u91。知道怎么回事吗?
--更新--
我发现这与我在程序中使用的优化例程有关。
我所做的是使用em算法(作为迭代算法)用最大似然法估计统计模型。在每次迭代中,我需要通过解决最小化问题来更新参数。spark负责计算我的似然和梯度,然后传递给scipy的最小化例程,在那里我使用l-bfgs-b方法。看来这套程序中有什么东西毁了我的工作。但我不知道是哪一部分造成了这个问题。
另一个观察结果是,在使用相同的示例和相同的程序时,我更改了分区的数量。当分区数很小时,我的程序可以毫无问题地完成。但是,当分区数量变大时,程序开始崩溃。
4条答案
按热度按时间laximzn51#
我们在ibm的spss modeler中使用pyspark扩展节点时遇到了相同的问题。所有上述解决方案(以及在互联网上可以找到的其他解决方案)都不起作用。在某个时刻,我们发现当我和同事在同一台机器上同时执行pyspark扩展节点时,总是会发生这种情况。这似乎让python的工作人员陷入了混乱或被杀的境地。唯一的解决办法是不要同时执行pyspark的东西。。。
monwx1rj2#
我有一个类似的问题,对我来说,这解决了它:
在此处设置其他选项的更多示例:https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad
gkl3eglg3#
我也有类似的问题。我有一个迭代,有时执行时间太长,超时。增加的
spark.executor.heartbeatInterval
似乎解决了问题。我将它增加到3600,以确保我不会再次遇到超时,从那时起一切都正常工作。发件人:http://spark.apache.org/docs/latest/configuration.html :
spark.executor.heartbeatinterval每个执行者对驱动程序的心跳间隔10s。heartbeats让驱动程序知道executor仍然活着,并用进行中任务的度量来更新它。
lx0bsm1f4#
查看执行者日志以了解详细信息。当执行器死亡或被集群管理器杀死时(通常是因为使用了比容器配置更多的内存),我也看到过类似的错误。