scala—在数组中选择一系列元素

z0qdvdin  于 2021-05-27  发布在  Spark
关注(0)|答案(8)|浏览(443)

我用 spark-shell 执行以下操作。
最近在sparksql中加载了一个带有数组列的表。
以下是相同的ddl:

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

数据看起来像这样

+-------+-------+-------------------------------+
|dept_id|dept_nm|                     emp_details|
+-------+-------+-------------------------------+
|     10|Finance|[Jon, Snow, Castle, Black, Ned]|
|     20|     IT|            [Ned, is, no, more]|
+-------+-------+-------------------------------+

我可以这样查询emp\u details列:

sqlContext.sql("select emp_details[0] from emp_details").show

问题
我要查询集合中的一系列元素:
需要查询才能工作

sqlContext.sql("select emp_details[0-2] from emp_details").show

或者

sqlContext.sql("select emp_details[0:2] from emp_details").show

预期产量

+-------------------+
|        emp_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

在纯scala中,如果我有一个数组:

val emp_details = Array("Jon","Snow","Castle","Black")

我可以使用

emp_details.slice(0,3)

还给我

Array(Jon, Snow,Castle)

我无法在spark sql中应用数组的上述操作。
谢谢

6vl6ewon

6vl6ewon1#

在ApacheSpark中使用selecrexpr()和split()函数。
例如:

fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
tgabmvqs

tgabmvqs2#

因为spark 2.4可以使用 slice 功能。在python中):

pyspark.sql.functions.slice(x, start, length)

collection函数:返回一个数组,该数组包含从索引开始(如果开始为负,则从结尾开始)起x中具有指定长度的所有元素。
...
版本2.4中的新功能。

from pyspark.sql.functions import slice

df = spark.createDataFrame([
    (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
    (20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))

df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

在斯卡拉

def slice(x: Column, start: Int, length: Int): Column

返回一个数组,该数组包含从索引开始(如果开始为负,则从结尾开始)起x中具有指定长度的所有元素。

import org.apache.spark.sql.functions.slice

val df = Seq(
    (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
    (20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")

df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

同样的事情当然可以在sql中完成

SELECT slice(emp_details, 1, 3) AS emp_details FROM df

重要提示:
请注意,这与 Seq.slice ,值从零开始索引,第二个参数是长度,而不是结束位置。

yeotifhr

yeotifhr3#

使用嵌套拆分: split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') ```
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673

scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df.createOrReplaceTempView("raw_data")

scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+--------------------+
| 10|Finance|[Jon, Snow, Castl...|
| 20| IT| [Ned, is, no, more]|
+-------+-------+--------------------+

scala> val df2 = spark.sql(
| s"""
| |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
| """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------+
| 10|Finance|[Jon, Snow, Castle]|
| 20| IT| [Ned, is, no]|
+-------+-------+-------------------+

uxh89sit

uxh89sit4#

你可以使用这个函数 array 要使用这三个值构建新数组,请执行以下操作:

import org.apache.spark.sql.functions._

val input = sqlContext.sql("select emp_details from emp_details")

val arr: Column = col("emp_details")
val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")

val result.show()
// +-------------------+
// |        emp_details|
// +-------------------+
// |[Jon, Snow, Castle]|
// |      [Ned, is, no]|
// +-------------------+
h5qlskok

h5qlskok5#

这是我的通用切片自定义项,支持任何类型的数组。有点难看,因为您需要提前知道元素类型。

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
  if (arr == null) null else arr.slice(from, until)

def slice(elemType: DataType): UserDefinedFunction = 
  udf(arraySlice _, ArrayType(elemType)

fs.select(slice(StringType)($"emp_details", 1, 2))
vnjpjtjt

vnjpjtjt6#

edit2:谁想以牺牲可读性为代价来避免udf;-)
如果您真的想在一个步骤中完成,那么必须使用scala创建一个lambda函数,返回 Column 把它 Package 成一个数组。这有点牵扯,但只是一步:

val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")

df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)    

+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

这个 _:* 将一个列表传递给一个所谓的变量函数有点神奇( array 在本例中,它构造了sql数组)。但我建议不要按原样使用这个解决方案。将lambda函数放入命名函数中

def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))

代码可读性。注意,一般来说,坚持 Column 表达式(不使用“udf”)具有更好的性能。
edit:为了在sql语句中完成它(正如您在问题中所问的……),遵循相同的逻辑,您将使用scala逻辑生成sql查询(并不是说它是最可读的)

def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"

sqlContext.sql(sqlQuery).show

+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

请注意,您可以替换 untilto 为了提供最后一个元素而不是迭代停止的元素。

qncylg1j

qncylg1j7#

下面是一个使用用户定义函数的解决方案,该函数的优点是可以处理任意大小的切片。它只是围绕scala内置函数构建一个udf函数 slice 方法:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))

以您的数据为例:

val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show

产生预期的输出

+--------------------+-------------------+
|         emp_details|              slice|
+--------------------+-------------------+
|[Jon, Snow, Castl...|[Jon, Snow, Castle]|
+--------------------+-------------------+

您也可以在您的帐户中注册自定义项 sqlContext 像这样使用

sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon‌​','Snow','Castle','Black','Ned'),0,3)")

你不需要 lit 再也不用用这个方法了

wydwbb8l

wydwbb8l8#

对于那些坚持使用spark<2.4并且没有 slice 函数,这里是pyspark中的一个解决方案(scala将非常类似),它不使用udf。相反,它使用sparksql函数 concat_ws , substring_index ,和 split .
这只适用于字符串数组。要使它与其他类型的数组一起工作,您必须首先将它们转换为字符串,然后在“切片”数组之后再转换回原始类型。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
    .master('yarn')
    .appName("array_slice")
    .getOrCreate()
)

emp_details = [
    ["Jon", "Snow", "Castle", "Black", "Ned"],
    ["Ned", "is", "no", "more"]
]

df1 = spark.createDataFrame(
    [tuple([emp]) for emp in emp_details],
    ["emp_details"]
)

df1.show(truncate=False)
+-------------------------------+
|emp_details                    |
+-------------------------------+
|[Jon, Snow, Castle, Black, Ned]|
|[Ned, is, no, more]            |
+-------------------------------+
last_string = 2

df2 = (
    df1
    .withColumn('last_string', (F.lit(last_string)))
    .withColumn('concat', F.concat_ws(" ", F.col('emp_details')))
    .withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" ))
    .withColumn('slice', F.split(F.col('slice'), ' '))
    .select('emp_details', 'slice')
)

df2.show(truncate=False)
+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
|[Ned, is, no, more]            |[Ned, is, no]      |
+-------------------------------+-------------------+

相关问题