我用 spark-shell
执行以下操作。
最近在sparksql中加载了一个带有数组列的表。
以下是相同的ddl:
create table test_emp_arr{
dept_id string,
dept_nm string,
emp_details Array<string>
}
数据看起来像这样
+-------+-------+-------------------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------------------+
| 10|Finance|[Jon, Snow, Castle, Black, Ned]|
| 20| IT| [Ned, is, no, more]|
+-------+-------+-------------------------------+
我可以这样查询emp\u details列:
sqlContext.sql("select emp_details[0] from emp_details").show
问题
我要查询集合中的一系列元素:
需要查询才能工作
sqlContext.sql("select emp_details[0-2] from emp_details").show
或者
sqlContext.sql("select emp_details[0:2] from emp_details").show
预期产量
+-------------------+
| emp_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
在纯scala中,如果我有一个数组:
val emp_details = Array("Jon","Snow","Castle","Black")
我可以使用
emp_details.slice(0,3)
还给我
Array(Jon, Snow,Castle)
我无法在spark sql中应用数组的上述操作。
谢谢
8条答案
按热度按时间6vl6ewon1#
在ApacheSpark中使用selecrexpr()和split()函数。
例如:
tgabmvqs2#
因为spark 2.4可以使用
slice
功能。在python中):collection函数:返回一个数组,该数组包含从索引开始(如果开始为负,则从结尾开始)起x中具有指定长度的所有元素。
...
版本2.4中的新功能。
在斯卡拉
返回一个数组,该数组包含从索引开始(如果开始为负,则从结尾开始)起x中具有指定长度的所有元素。
同样的事情当然可以在sql中完成
重要提示:
请注意,这与
Seq.slice
,值从零开始索引,第二个参数是长度,而不是结束位置。yeotifhr3#
使用嵌套拆分:
split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')
```scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673
scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df.createOrReplaceTempView("raw_data")
scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+--------------------+
| 10|Finance|[Jon, Snow, Castl...|
| 20| IT| [Ned, is, no, more]|
+-------+-------+--------------------+
scala> val df2 = spark.sql(
| s"""
| |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
| """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------+
| 10|Finance|[Jon, Snow, Castle]|
| 20| IT| [Ned, is, no]|
+-------+-------+-------------------+
uxh89sit4#
你可以使用这个函数
array
要使用这三个值构建新数组,请执行以下操作:h5qlskok5#
这是我的通用切片自定义项,支持任何类型的数组。有点难看,因为您需要提前知道元素类型。
vnjpjtjt6#
edit2:谁想以牺牲可读性为代价来避免udf;-)
如果您真的想在一个步骤中完成,那么必须使用scala创建一个lambda函数,返回
Column
把它 Package 成一个数组。这有点牵扯,但只是一步:这个
_:*
将一个列表传递给一个所谓的变量函数有点神奇(array
在本例中,它构造了sql数组)。但我建议不要按原样使用这个解决方案。将lambda函数放入命名函数中代码可读性。注意,一般来说,坚持
Column
表达式(不使用“udf”)具有更好的性能。edit:为了在sql语句中完成它(正如您在问题中所问的……),遵循相同的逻辑,您将使用scala逻辑生成sql查询(并不是说它是最可读的)
请注意,您可以替换
until
由to
为了提供最后一个元素而不是迭代停止的元素。qncylg1j7#
下面是一个使用用户定义函数的解决方案,该函数的优点是可以处理任意大小的切片。它只是围绕scala内置函数构建一个udf函数
slice
方法:以您的数据为例:
产生预期的输出
您也可以在您的帐户中注册自定义项
sqlContext
像这样使用你不需要
lit
再也不用用这个方法了wydwbb8l8#
对于那些坚持使用spark<2.4并且没有
slice
函数,这里是pyspark中的一个解决方案(scala将非常类似),它不使用udf。相反,它使用sparksql函数concat_ws
,substring_index
,和split
.这只适用于字符串数组。要使它与其他类型的数组一起工作,您必须首先将它们转换为字符串,然后在“切片”数组之后再转换回原始类型。