我的数据集看起来像这样,其中有一组逗号分隔的字符串值,分别为col1
和col2
。col3
是连接在一起的两列。
+===========+========+=============
|col1 |col2 |col3
+===========+========+=============
|a,b,c,d |a,c,d |a,b,c,d,a,c,d
|e,f,g |f,g,h |e,f,g,f,g,h
+===========+========+=============
基本上,我尝试做的是获取col3
中所有用逗号分隔的值,并将每个值及其计数附加到另一列。
我尝试在col4
中得到这样的输出:
+===========+========+==============+======================
|col1 |col2 |col3 |col4
+===========+========+==============+======================
|a,b,c,d |a,c,d |a,b,c,d,a,c,d |a: 2, b: 1, c: 2, d: 2
|e,f,g |f,g,h |e,f,g,f,g,h |e: 1, f: 2, g: 2, h: 1
+===========+========+==============+======================
我已经找到了如何将列concat
组合在一起以得到col3
,但在得到col4
时遇到了一些麻烦。这里是我离开的地方,我有点不确定从这里去哪里:
from pyspark.sql.functions import concat, countDistinct
df = df.select(concat(df.col1, df.col2).alias('col3'), '*')
df.agg(countDistinct('col3')).show()
# +--------------------+
# |count(DISTINCT col3)|
# +--------------------+
# | 2|
# +--------------------+
如何动态统计col3
中以逗号分隔的子字符串,并创建一个最后一列,显示数据集中所有行的每个子字符串的频率?
2条答案
按热度按时间fnx2tebb1#
使用自定义项
这里有一个使用udfs的方法。首先是数据生成。
然后使用一些原生Python函数,如Counter和json来完成任务。
结果
使用原生pyspark函数的解决方案
这个解决方案比使用udf要复杂一些,但是由于没有udf,它的性能可能会更好。这个想法是concat三个字符串列和爆炸。为了知道每一个被分解的行是从哪里来的,我们添加了一个索引。双重分组将帮助我们得到想要的结果。最后,我们将结果连接回原始框架,以获得所需的模式。
导致
请注意,我们在udf部分创建的json通常比使用原生pyspark函数在
grouped_counts
列中创建的简单字符串更方便。qnakjoqk2#
我建议使用原生Spark选项,使用 * 数组 * 而不是字符串。
输出类型为
map
而不是string
。通过这种方式,可以使用例如F.col('counts')['g']
。