Spark Dataframe :collect()与select()

31moq8wy  于 2023-04-07  发布在  Apache
关注(0)|答案(6)|浏览(160)

在RDD上调用collect()会将整个数据集返回给驱动程序,这可能会导致内存不足,我们应该避免这种情况。
如果在 Dataframe 上调用collect(),其行为是否相同?
select()方法是什么?
如果在dataframe上调用它,它的工作方式是否与collect()相同?

jyztefdp

jyztefdp1#

行动vs转变

  • Collect(Action)-在驱动程序中以数组的形式返回数据集的所有元素。这在过滤器或其他返回足够小的数据子集的操作之后通常很有用。

spark-sql doc

select(*cols)(transformation)-投射一组表达式并返回一个新的DataFrame。

参数:cols -列名(字符串)或表达式(列)的列表。如果其中一个列名为'*',则该列将扩展为包括当前DataFrame中的所有列。**

df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]

在dataframe上执行select(column-name1,column-name2,etc)方法,返回一个新的dataframe,它只包含在select()函数中选择的列。
例如,假设df具有包括“名称”和“值”以及其他一些列的若干列。

df2 = df.select("name","value")

df2将只保存df的整个列中的两列(“name”和“value
作为select结果的df 2将在执行器中,而不是在驱动程序中(与使用collect()的情况一样)
sql-programming-guide

df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

您可以在dataframe上运行collect()(spark docs)

>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]

spark docs
要打印驱动程序上的所有元素,可以使用collect()方法首先将RDD带到驱动程序节点,如下所示:但是,这可能会导致驱动程序耗尽内存,因为collect()会将整个RDD提取到一台机器上**;如果你只需要打印RDD的几个元素,一个更安全的方法是使用take():rdd.take(100).foreach(println).

h79rfbju

h79rfbju2#

调用select将导致延迟计算。例如:

val df1 = df.select("col1")
val df2 = df1.filter("col1 == 3")

上面的两个语句都创建了惰性路径,当你在df上调用一个动作时,这些路径将被执行,比如showcollect等。

val df3 = df2.collect()

在您的转换结束时使用.explain以遵循其计划。以下是更详细的信息:Transformations and Actions

olqngx59

olqngx593#

Select用于投影dataframe的部分或全部字段。它不会给予value,而是新的dataframe。它是transformation

t2a7ltrp

t2a7ltrp4#

直接回答问题:
如果在 Dataframe 上调用collect(),其行为是否相同?
是的,spark.DataFrame.collect在功能上与spark.RDD.collect相同。它们在这些不同的对象上具有相同的目的。
select()方法是什么?
没有spark.RDD.select这样的东西,所以它不能与spark.DataFrame.select相同。
如果在dataframe上调用它,它的工作方式是否与collect()相同?
selectcollect之间唯一的相似之处是它们都是DataFrame上的函数。它们在功能上绝对没有重叠。
以下是我自己的描述:collectsc.parallelize相反。select与任何SQL语句中的SELECT相同。
如果您仍然无法理解collect实际上做了什么(对于RDD或DataFrame),那么您需要查找一些关于spark在幕后做了什么的文章。

pw9qyyiw

pw9qyyiw5#

Select是一个转换,而不是一个动作,所以它是惰性计算的(实际上不会进行计算,只是Map操作)。
尝试:
df.limit(20).collect()

kxkpmulp

kxkpmulp6#

简短的回答用粗体表示:

  • collect主要是序列化

(loss保留 Dataframe 的所有其他数据特性的并行性)
例如,对于PrintWriter pw,您不能直接使用df.foreach( r => pw.write(r) ),必须在foreachdf.collect.foreach(etc)之前使用collect
PS:“并行性的损失”不是“完全损失”,因为在序列化之后,它可以再次分发给执行器。

(only在框架的上下文中类似,因为Spark select不会删除重复数据)。
因此,它也是框架上下文中filter的补充。
对其他答案的评论解释:我喜欢 transformations(如select)和 actions(如collect)中的Jeff's classification of Spark operations。还请记住 transforms(包括select)是lazily evaluated

相关问题