sparksql-提取多个regex匹配(仅使用sql)

5ktev3wc  于 2021-07-24  发布在  Spark
关注(0)|答案(4)|浏览(1161)

我有一个原始文本中的sql查询数据集,还有一个包含所有可能表名的正则表达式:


# queries

+-----+----------------------------------------------+
| id  | query                                        |                  
+-----+----------------------------------------------+
| 1   | select * from table_a, table_b               |
| 2   | select * from table_c join table_d...        |
+-----+----------------------------------------------+

# regexp

'table_a|table_b|table_c|table_d'

我希望得到以下结果:


# expected result

+-----+----------------------------------------------+
| id  | tables                                       |                  
+-----+----------------------------------------------+
| 1   | [table_a, table_b]                           |
| 2   | [table_c, table_d]                           |
+-----+----------------------------------------------+

但是在spark中使用下面的sql,我得到的只是第一个匹配。。。

select
  id,
  regexp_extract(query, 'table_a|table_b|table_c|table_d') as tables
from queries

# actual result

+-----+----------------------------------------------+
| id  | tables                                       |                  
+-----+----------------------------------------------+
| 1   |  table_a                                     |
| 2   |  table_c                                     |
+-----+----------------------------------------------+

有没有办法只用sparksql来实现这一点?这就是我正在使用的函数https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/sql/#regexp_extract
编辑
我也会接受返回以下内容的解决方案:


# alternative solution

+-----+----------------------------------------------+
| id  | tables                                       |                  
+-----+----------------------------------------------+
| 1   | table_a                                      |
| 1   | table_b                                      |
| 2   | table_c                                      |
| 2   | table_d                                      |
+-----+----------------------------------------------+

解决方案
@chlebek在下面解决了这个问题。为了提高可读性,我使用CTE重新格式化了他的sql:

with
split_queries as (
  select 
    id, 
    explode(split(query, ' ')) as col
  from queries
),
extracted_tables as (
  select 
    id, 
    regexp_extract(col, 'table_a|table_b|table_c|table_d', 0) as rx
  from split_queries
)
select
  id,
  collect_set(rx) as tables
from extracted_tables
where rx != ''
group by id

请记住 split(query, ' ') 部分查询将仅按空格分割sql。如果你有其他事情,如制表符,换行符,注解等,你应该处理这些之前或分裂时。

ttp71kqs

ttp71kqs1#

当您使用sparksql时,您可以使用sql解析器&它将为您完成这项工作。

def getTables(query: String): Seq[String] = {
  val logicalPlan = spark.sessionState.sqlParser.parsePlan(query)
  import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
  logicalPlan.collect { case r: UnresolvedRelation => r.tableName }
}

val query = "select * from table_1 as a left join table_2 as b on 
a.id=b.id"
scala> getTables(query).foreach(println)
table_1
table_2

您可以将“gettables”注册为udf并在查询中使用

deikduxw

deikduxw2#

您可以使用spark中的另一个sql函数collect\u listhttps://docs.databricks.com/spark/latest/spark-sql/language-manual/functions.html#collect_list. 你可以再找一个样品https://mungingdata.com/apache-spark/arraytype-columns/
基本上,应用到代码中

val df = spark.sql("select 1 id, 'select * from table_a, table_b' query" )
val df1 = spark.sql("select 2 id, 'select * from table_c join table_d' query" )

val df3 = df.union(df1)

df3.createOrReplaceTempView("tabla")

   spark.sql("""
select id, collect_list(tables) from (
        select id, explode(split(query, ' ')) as tables
        from tabla)
where tables like 'table%' group by id""").show

输出将是

+---+--------------------+
| id|collect_list(tables)|
+---+--------------------+
|  1| [table_a,, table_b]|
|  2|  [table_c, table_d]|
+---+--------------------+

希望这有帮助

ndh0cuux

ndh0cuux3#

如果你在 spark>=2.4 然后你可以移除 exploding 以及 collecting the same 在数组上使用高阶函数而不使用任何子查询的操作-

加载测试数据

val data =
      """
        |id  | query
        |1   | select * from table_a, table_b
        |2   | select * from table_c join table_d on table_c.id=table_d.id
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";"))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ";")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)
    df.printSchema()
    df.show(false)

    /**
      * root
      * |-- id: integer (nullable = true)
      * |-- query: string (nullable = true)
      *
      * +---+-----------------------------------------------------------+
      * |id |query                                                      |
      * +---+-----------------------------------------------------------+
      * |1  |select * from table_a, table_b                             |
      * |2  |select * from table_c join table_d on table_c.id=table_d.id|
      * +---+-----------------------------------------------------------+
      */

从查询中提取表

// spark >= 2.4.0
    df.createOrReplaceTempView("queries")
    spark.sql(
      """
        |select id,
        |   array_distinct(
        |   FILTER(
        |     split(query, '\\.|=|\\s+|,'), x -> x rlike 'table_a|table_b|table_c|table_d'
        |     )
        |    )as tables
        |FROM
        |   queries
      """.stripMargin)
      .show(false)

    /**
      * +---+------------------+
      * |id |tables            |
      * +---+------------------+
      * |1  |[table_a, table_b]|
      * |2  |[table_c, table_d]|
      * +---+------------------+
      */
von4xj4u

von4xj4u4#

如果只需要检查几个值,则可以使用 contains 函数而不是regexp:

val names = Seq("table_a","table_b","table_c","table_d")
def c(col: Column) = names.map(n => when(col.contains(n),n).otherwise(""))
df.select('id,array_remove(array(c('query):_*),"").as("result")).show(false)

但使用regexp时,它将如下所示(spark sql api):

df.select('id,explode(split('query," ")))
.select('id,regexp_extract('col,"table_a|table_b|table_c|table_d",0).as("rx"))
.filter('rx=!="")
.groupBy('id)
.agg(collect_list('rx))

可以转换为以下sql查询:

select id, collect_list(rx) from
    (select id, regexp_extract(col,'table_a|table_b|table_c|table_d',0) as rx from
        (select id, explode(split(query,' ')) as col from df) q1
    ) q2
where rx != '' group by id

因此,输出将是:

+---+------------------+
| id|  collect_list(rx)|
+---+------------------+
|  1|[table_a, table_b]|
|  2|[table_c, table_d]|
+---+------------------+

相关问题