使用pyspark或sparksql选择不在组中的所有列或特定列

dpiehjr4  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(362)

我试图从下面的数据中得到一个结果集。显示的是示例数据。我正在尝试为name和department设置不同的值。我可以看到几个与获得不同计数相关的答案,但找不到与我的场景匹配的答案,或者可能在我找不到的地方。

employeeDF = sqlContext.createDataFrame([('1235','Hary','IT','U'),
                                         ('879','Jack','PTA','R'),
                                  ('32569','Hary','IT','T'),
                                         ('4598','MiKe','HR','Y')],
                                 ['ID','Name','Department','Tag']) 
(employeeDF.show())
employeeDF.createOrReplaceTempView("employee")
+-----+----+----------+--+
|   ID|Name|Department|Tag|
+-----+----+----------+--+
| 1235|Hary|        IT|U|
|  879|Jack|       PTA|R|
|32569|Hary|        IT|T|
| 4598|MiKe|        HR|Y|
+-----+----+----------+--+

所以在数据集中你可以看到hary有两个不同的id,所以我认为这是垃圾数据,我不希望它们出现在我的结果集中。我想实现的是一个没有重复id的集合
我的预期产出是

++----+-------------+
|ID  |Name|Department|
+----+----+----------+
|879 |Jack|       PTA|
|4598|MiKe|        HR|
+----+----+----------+

通过在下面运行查询,我可以通过按姓名和部门分组得到下面的集合,但同时我还需要id。我无法在group by中使用id,因为我正在尝试获取具有唯一id的名称和部门的不同集合

df = sqlContext.sql("select Name,Department from employee group by Name,Department having (count(distinct ID) =1 )")
df.show()
+----+----------+
|Name|Department|
+----+----------+
|Jack|       PTA|
|MiKe|        HR|
+----+----------+

更新1这里我复制了mike的条目,以确保collect\u set gets删除重复的id,并对name和department进行计数。

employeeDF = sqlContext.createDataFrame([('1235','Hary','IT'),
                                         ('879','Jack','PTA'),
                                  ('32569','Hary','IT'),
                                         ('4598','MiKe','HR'),('4598','MiKe','HR')],
                                 ['ID','Name','Department']) 
(employeeDF.show())
employeeDF.createOrReplaceTempView("employee")

from pyspark.sql import functions as F, Window

result = employeeDF.withColumn(
    'count_id', 
    F.size(F.collect_set('ID').over(Window.partitionBy('Name', 'Department')))
).filter('count_id = 1').drop('count_id')
result.show()

+-----+----+----------+--------+
|   ID|Name|Department|count_id|
+-----+----+----------+--------+
|  879|Jack|       PTA|       1|
| 4598|MiKe|        HR|       1|
| 4598|MiKe|        HR|       1|
|32569|Hary|        IT|       2|
| 1235|Hary|        IT|       2|
+-----+----+----------+--------+

mike的条目是一个具有相同id的重复条目,因此在使用group by of(mike,hr)时,我需要将它们作为计数为1的条目,因为id在名称和hr方面是相同的
预期结果

+-----+----+----------+--------+
    |   ID|Name|Department|count_id|
    +-----+----+----------+--------+
    |  879|Jack|       PTA|       1|
    | 4598|MiKe|        HR|       1|
    |32569|Hary|        IT|       2|
    | 1235|Hary|        IT|       2|
    +-----+----+----------+--------+
qhhrdooz

qhhrdooz1#

您可以使用 size(collect_set()) :

from pyspark.sql import functions as F, Window

result = employeeDF.withColumn(
    'count_id', 
    F.size(F.collect_set('ID').over(Window.partitionBy('Name', 'Department')))
).filter('count_id = 1').drop('count_id').distinct()

result.show()
+----+----+----------+
|  ID|Name|Department|
+----+----+----------+
| 879|Jack|       PTA|
|4598|MiKe|        HR|
+----+----+----------+
eoigrqb6

eoigrqb62#

你可以用 exists 在spark sql中:

df = sqlContext.sql("""
select * 
from employee e1 
where not exists (
            select 1 
            from employee e2 
            where e1.Department = e2.Department 
            and e1.Name = e2.Name and e1.ID != e2.ID
            )
""")

df.show()

# +----+----+----------+---+

# |  ID|Name|Department|Tag|

# +----+----+----------+---+

# |4598|MiKe|        HR|  Y|

# | 879|Jack|       PTA|  R|

# +----+----+----------+---+

相关问题