我试图复制给定的代码,看看如何 foreach
工作,我尝试了以下代码:
rdd = sc.parallelize([1,2,3,4,5])
def f(a):
print(a)
rdd.collect().foreach(f)
但它给出了以下错误:
attributeerror:“list”对象没有“foreach”属性
我理解返回类型的错误 collect()
是一个 array
(这是列表)并且它没有 foreach
但是,我不明白如果在官方文件中给出的话,这怎么会不起作用 spark 3.0.1
文档。我错过了什么。我正在使用 Spark 3.0.1
1条答案
按热度按时间jexiocij1#
rdd.collect()
是一个将数据收集到驱动程序的spark动作。这个collect
方法返回一个列表,因此如果要打印列表的元素,只需执行以下操作:另一种方法是使用另一个动作
foreach
例如,在你的例子中提到的rdd.foreach(f)
. 尽管,由于spark的分布特性,不能保证print
命令将数据发送到驱动程序的输出流。这就意味着你永远看不到这些数据被打印出来。为什么?因为spark有两种部署模式,
cluster
以及client
模式。在集群模式下,驱动程序在集群中的任意节点上运行。另一方面,在客户机模式下,驱动程序在提交spark作业的机器上运行,因此spark驱动程序和程序共享相同的输出流。因此,您应该始终能够看到程序的输出。相关链接
https://stackoverflow.com/questions/41124428/spark-yarn-cluster-vs-client-how-to-choose-which-one-to-use#:~:text=in%20cluster%20mode%2c%20the%20park,用于从%20yarn请求%20resources%20s。
https://spark.apache.org/docs/latest/cluster-overview.html
https://blog.knoldus.com/cluster-vs-client-execution-modes-for-a-spark-application/