我试图从下面的数据中得到一个结果集。显示的是示例数据。我正在尝试为name和department设置不同的值。我可以看到几个与获得不同计数相关的答案,但找不到与我的场景匹配的答案,或者可能在我找不到的地方。
employeeDF = sqlContext.createDataFrame([('1235','Hary','IT','U'),
('879','Jack','PTA','R'),
('32569','Hary','IT','T'),
('4598','MiKe','HR','Y')],
['ID','Name','Department','Tag'])
(employeeDF.show())
employeeDF.createOrReplaceTempView("employee")
+-----+----+----------+--+
| ID|Name|Department|Tag|
+-----+----+----------+--+
| 1235|Hary| IT|U|
| 879|Jack| PTA|R|
|32569|Hary| IT|T|
| 4598|MiKe| HR|Y|
+-----+----+----------+--+
所以在数据集中你可以看到hary有两个不同的id,所以我认为这是垃圾数据,我不希望它们出现在我的结果集中。我想实现的是一个没有重复id的集合
我的预期产出是
++----+-------------+
|ID |Name|Department|
+----+----+----------+
|879 |Jack| PTA|
|4598|MiKe| HR|
+----+----+----------+
通过在下面运行查询,我可以通过按姓名和部门分组得到下面的集合,但同时我还需要id。我无法在group by中使用id,因为我正在尝试获取具有唯一id的名称和部门的不同集合
df = sqlContext.sql("select Name,Department from employee group by Name,Department having (count(distinct ID) =1 )")
df.show()
+----+----------+
|Name|Department|
+----+----------+
|Jack| PTA|
|MiKe| HR|
+----+----------+
更新1这里我复制了mike的条目,以确保collect\u set gets删除重复的id,并对name和department进行计数。
employeeDF = sqlContext.createDataFrame([('1235','Hary','IT'),
('879','Jack','PTA'),
('32569','Hary','IT'),
('4598','MiKe','HR'),('4598','MiKe','HR')],
['ID','Name','Department'])
(employeeDF.show())
employeeDF.createOrReplaceTempView("employee")
from pyspark.sql import functions as F, Window
result = employeeDF.withColumn(
'count_id',
F.size(F.collect_set('ID').over(Window.partitionBy('Name', 'Department')))
).filter('count_id = 1').drop('count_id')
result.show()
+-----+----+----------+--------+
| ID|Name|Department|count_id|
+-----+----+----------+--------+
| 879|Jack| PTA| 1|
| 4598|MiKe| HR| 1|
| 4598|MiKe| HR| 1|
|32569|Hary| IT| 2|
| 1235|Hary| IT| 2|
+-----+----+----------+--------+
mike的条目是一个具有相同id的重复条目,因此在使用group by of(mike,hr)时,我需要将它们作为计数为1的条目,因为id在名称和hr方面是相同的
预期结果
+-----+----+----------+--------+
| ID|Name|Department|count_id|
+-----+----+----------+--------+
| 879|Jack| PTA| 1|
| 4598|MiKe| HR| 1|
|32569|Hary| IT| 2|
| 1235|Hary| IT| 2|
+-----+----+----------+--------+
2条答案
按热度按时间qhhrdooz1#
您可以使用
size(collect_set())
:eoigrqb62#
你可以用
exists
在spark sql中: