在pyspark sql中用dropduplicates替换sql group by?

nukf8bse  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(323)

我无法运行以下sql查询,因为在查询中,我选择的某些列不在groupby中。这个查询在talend中运行,但我无法在sparksql中运行它。以下是主要查询:

SELECT a.*
FROM
  (SELECT a1.ID_BU,
          a1.Nama,
          a1.ID_Bentuk_BU,
          a1.id_bentuk_usaha,
          a1.ID_Jenis_BU,
          a1.ID_Jenis_BU_kbli,
          a1.Alamat,
          a1.Kodepos,
          a1.Telepon,
          a1.Fax,
          a1.Email,
          a1.website,
          a1.ID_Kabupaten,
          a1.ID_Propinsi,
          a1.NPWP,
          a1.no_spt AS modal_dasar,
          a1.Log,
          a2.BU_Nomor
   FROM bu a1,
        bu_nomor a2
   WHERE a1.`ID_BU`=a2.`ID_BU`
     AND a1.`ID_Propinsi`=a2.`id_Propinsi` ) AS a,

  (SELECT b.ID_BU,
          b.id_sub_klasifikasi_kbli,
          b.kualifikasi_kbli,
          b.ID_Asosiasi_BU,
          b.Propinsi,
          b.tgl_permohonan,
          c.tgl_habis
   FROM
     (SELECT b1.ID_BU,
             b1.id_sub_klasifikasi_kbli,
             b1.kualifikasi_kbli,
             b1.ID_Asosiasi_BU,
             b1.Propinsi,
             b1.tgl_permohonan
      FROM bu_registrasi_history_kbli b1
      WHERE b1.id_status='4'
        AND b1.Tgl_proses<'2018-03-01' )AS b,

     (SELECT c1.ID_BU,
             c1.id_klasifikasi,
             c1.ID_Asosiasi_BU,
             c1.tgl_habis
      FROM bu_sbu_kbli c1
      WHERE c1.tgl_habis>='2018-03-01' )AS c
   WHERE b.ID_BU=c.ID_BU
     AND SUBSTR(b.id_sub_klasifikasi_kbli, 1, 3)=c.id_klasifikasi
     AND b.ID_Asosiasi_BU=c.ID_Asosiasi_BU
   UNION ALL SELECT d.ID_BU,
                    d.id_sub_klasifikasi_kbli,
                    d.kualifikasi_kbli,
                    d.ID_Asosiasi_BU,
                    d.Propinsi,
                    d.tgl_permohonan,
                    e.tgl_habis
   FROM
     (SELECT d1.ID_BU,
             d1.id_sub_klasifikasi_kbli,
             d1.kualifikasi_kbli,
             d1.ID_Asosiasi_BU,
             d1.Propinsi,
             d1.tgl_permohonan
      FROM bu_registrasi_history_kbli_hapus d1
      WHERE d1.id_status='4'
        AND d1.Tgl_proses<'2018-03-01' )AS d,

     (SELECT e1.ID_BU,
             e1.id_klasifikasi,
             e1.ID_Asosiasi_BU,
             e1.tgl_habis
      FROM bu_sbu_kbli_hapus e1
      WHERE e1.tgl_habis>='2018-03-01' )AS e
   WHERE d.ID_BU=e.ID_BU
     AND SUBSTR(d.id_sub_klasifikasi_kbli, 1, 3)=e.id_klasifikasi
     AND d.ID_Asosiasi_BU=e.ID_Asosiasi_BU
   GROUP BY d.ID_BU,
            d.id_sub_klasifikasi_kbli
   ORDER BY tgl_habis,
            tgl_permohonan DESC) x1
WHERE a.ID_BU=x1.ID_BU
GROUP BY x1.ID_BU

由于groupby不允许我在sparksql中执行上述查询,因此我删除了groupby,并在生成的Dataframe中使用了dropduplicates。以下是修改后的代码:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .enableHiveSupport()\
    .getOrCreate()
results = spark.sql("""
SELECT a.*
FROM
  (SELECT a1.ID_BU,
          a1.Nama,
          a1.ID_Bentuk_BU,
          a1.id_bentuk_usaha,
          a1.ID_Jenis_BU,
          a1.ID_Jenis_BU_kbli,
          a1.Alamat,
          a1.Kodepos,
          a1.Telepon,
          a1.Fax,
          a1.Email,
          a1.website,
          a1.ID_Kabupaten,
          a1.ID_Propinsi,
          a1.NPWP,
          a1.no_spt AS modal_dasar,
          a1.Log,
          a2.BU_Nomor
   FROM lpjk_dwh.new_lpjk_bu a1,
        lpjk_dwh.new_lpjk_bu_nomor a2
   WHERE a1.`ID_BU`=a2.`ID_BU`
     AND a1.`ID_Propinsi`=a2.`id_Propinsi` ) AS a
""")

results1 = spark.sql("""
Select x1.* from (SELECT b.ID_BU,
          b.id_sub_klasifikasi_kbli,
          b.kualifikasi_kbli,
          b.ID_Asosiasi_BU,
          b.Propinsi,
          b.tgl_permohonan,
          c.tgl_habis
   FROM
     (SELECT b1.ID_BU,
             b1.id_sub_klasifikasi_kbli,
             b1.kualifikasi_kbli,
             b1.ID_Asosiasi_BU,
             b1.Propinsi,
             b1.tgl_permohonan
      FROM lpjk_dwh.new_lpjk_bu_registrasi_history_kbli b1
      WHERE b1.id_status='4'
        AND b1.Tgl_proses<'2018-03-01' )AS b,

     (SELECT c1.ID_BU,
             c1.id_klasifikasi,
             c1.ID_Asosiasi_BU,
             c1.tgl_habis
      FROM lpjk_dwh.new_lpjk_bu_sbu_kbli c1
      WHERE c1.tgl_habis>='2018-03-01' )AS c
   WHERE b.ID_BU=c.ID_BU
     AND SUBSTR(b.id_sub_klasifikasi_kbli, 1, 3)=c.id_klasifikasi
     AND b.ID_Asosiasi_BU=c.ID_Asosiasi_BU
   UNION ALL SELECT d.ID_BU,
                    d.id_sub_klasifikasi_kbli,
                    d.kualifikasi_kbli,
                    d.ID_Asosiasi_BU,
                    d.Propinsi,
                    d.tgl_permohonan,
                    e.tgl_habis
   FROM
     (SELECT d1.ID_BU,
             d1.id_sub_klasifikasi_kbli,
             d1.kualifikasi_kbli,
             d1.ID_Asosiasi_BU,
             d1.Propinsi,
             d1.tgl_permohonan
      FROM lpjk_dwh.new_lpjk_bu_registrasi_history_kbli_hapus d1
      WHERE d1.id_status='4'
        AND d1.Tgl_proses<'2018-03-01' )AS d,

     (SELECT e1.ID_BU,
             e1.id_klasifikasi,
             e1.ID_Asosiasi_BU,
             e1.tgl_habis
      FROM lpjk_dwh.new_lpjk_bu_sbu_kbli_hapus e1
      WHERE e1.tgl_habis>='2018-03-01' )AS e
   WHERE d.ID_BU=e.ID_BU
     AND SUBSTR(d.id_sub_klasifikasi_kbli, 1, 3)=e.id_klasifikasi
     AND d.ID_Asosiasi_BU=e.ID_Asosiasi_BU
   ORDER BY tgl_habis,
            tgl_permohonan DESC) x1

""")

results2 = results1.dropDubplicates(['id_bu', 'id_sub_klasifikasi_kbli'])

result3 = results.join(results2, "id_bu")
new_result = result3.dropDubplicates(["id_bu"])
new_result.write.mode("overwrite").saveAsTable("lpjk_dwh.thirdset")

我能换成那样吗?如果这两个查询给出相同的结果?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题