pyspark查询和sql-pyspark查询

wkyowqbh  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(490)

嘿,我有一个dataframe,它包含带有以下列的行:date和text,我需要找出每天有多少行包含单词“corona”(dataframes和sql两种方式)
单词corona需要是一个单词而不是一个子串,如果单词旁边有一个puntuation标记,我也需要计算它。
我从删除text列的puntiation开始,然后添加了一个名为mark的指示符列,以标记行中是否包含单词corona,然后我对check列求和并按date列分组
我想问这样做对吗?
2.我尝试将其转换为pyspark sql查询(如果我使用这种方式,则需要使用sql代码添加check列),但结果非常不同,因此如何转换?

dataframes way:

# above i defiend the puntuation function and i read the data into df

df = df.withColumn('no_punc_text',punc_udf('text'))
df = df.select('no_punc_text','dates')
df.registerTempTable('my_table')
df = df.withColumn("check",F.col("no_punc_text").rlike("corona " or " corona" or " corona 
    ").cast("Integer"))
        dfway = df.groupBy("dates").sum('check')
the sql way:
sqlw = spark.sql(
      """
        select dates, sum(
         case when (no_punc_text rlike ' corona') then 1 
         when (no_punc_text rlike ' corona') then 1 
         when (no_punc_text rlike ' corona ') then 1 else 0 end
        ) as check
        from my_table group by dates
      """)
pkbketx9

pkbketx91#

使用词边界( \b )如下所示-

加载测试数据

val df = Seq("corona", "corona?", "this is corona", "coronavirus", "corona's", "is this corona?")
      .toDF("text")
      .withColumn("dates", monotonically_increasing_id())
    df.show(false)
    df.printSchema()

    /**
      * +---------------+-----+
      * |text           |dates|
      * +---------------+-----+
      * |corona         |0    |
      * |corona?        |1    |
      * |this is corona |2    |
      * |coronavirus    |3    |
      * |corona's       |4    |
      * |is this corona?|5    |
      * +---------------+-----+
      *
      * root
      * |-- text: string (nullable = true)
      * |-- dates: long (nullable = false)
      */

按以下要求检测电晕字

单词corona需要是一个单词而不是一个子串,如果单词旁边有一个puntuation标记,我也需要计算它。

df.createOrReplaceTempView("my_table")
    spark.sql(
      """
        | select dates, sum(
        |         case when (text rlike '\\bcorona\\b') then 1
        |         else 0 end
        |        ) as check
        |        from my_table group by dates
      """.stripMargin)
      .show(false)

    /**
      * +-----+-----+
      * |dates|check|
      * +-----+-----+
      * |2    |1    |
      * |4    |1    |
      * |5    |1    |
      * |0    |1    |
      * |1    |1    |
      * |3    |0    |
      * +-----+-----+
      */

请注意 coronavirus 字符串未被检测为corona,因为您不想考虑子字符串

在python中

sqlw = spark.sql(
      """
         select dates, sum(
          case when (text rlike '\\bcorona\\b') then 1
          else 0 end
         ) as check
         from my_table group by dates
      """)
kqqjbcuj

kqqjbcuj2#

我可以帮你做Pypark的那部分。最好避免使用自定义项,总是有一种与inbuit函数等效的方法。在您的情况下,column的contains()函数会很有帮助。参考:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=contain#pyspark.sql.column.contains
考虑一个测试Dataframe。

test_df= sqlContext.createDataFrame(["stay safe","lets make the world coronafree","corona spreads through contact","there is no vaccine yet for corona,but is in progress","community has to unite against corona."],"string").toDF('text')
test_df.show(truncate=False)    

+-----------------------------------------------------+
|text                                                 |
+-----------------------------------------------------+
|stay safe                                            |
|lets make the world coronafree                       |
|corona spreads through contact                       |
|there is no vaccine yet for corona,but is in progress|
|community has to unite against corona.               |
+-----------------------------------------------------+

test_df_f = test_df.where(F.col('text').contains('corona'))
test_df_f.show()
+-----------------------------------------------------+
|text                                                 |
+-----------------------------------------------------+
|lets make the world coronafree                       |
|corona spreads through contact                       |
|there is no vaccine yet for corona,but is in progress|
|community has to unite against corona.               |
+-----------------------------------------------------+

你可以看到所有的标点符号都已经处理好了。使用这个经过过滤的Dataframetest\ df\ f,您可以执行一个计数来直接获得行数或任何其他按日期进行的聚合,以便进行进一步的分析。
如果您需要匹配整个单词,则可以使用以下命令:

test_df_f_whole = test_df.where("text RLIKE '\\\\bcorona\\\\b'")
test_df_f_whole.show(truncate=False)

+-----------------------------------------------------+
|text                                                 |
+-----------------------------------------------------+
|corona spreads through contact                       |
|there is no vaccine yet for corona,but is in progress|
|community has to unite against corona.               |
+-----------------------------------------------------+

裁判:https://html.developreference.com/article/12239248/how+to+use+word+boundary+in+rlike+in+pyspark+sql+dataframes

相关问题