如何在具有数组类型列的值中添加索引

bd1hkmkf  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(275)

我需要向列(数组)中添加序列号,其中我的源数据是Parquet格式的,并且数量接近20亿条记录。在这里,我只能从parquet中选择key和code列,并将序号添加到ref\ u代码中,然后将其加载回s3

Key_1       Key_2       Key_3  Ref_codes
112240386   7435038894  2    [4659,53540,78907]
113325994   7940375640  1      [7232,7840,83969]
223352476   7765270324  4      [9999]
345936074   7950076012  1      [78650,4829,30000]

Key_1       Key_2       Key_3   Ref_codes
112240386   7435038894  2       [(4659,0),(53540,1),(78907,2)]
113325994   7940375640  1       [(7232,0),(7840,1),(83969,2)]
223352476   7765270324  4       [(9999,0)]
345936074   7950076012  1       [(78650,0),(4829,1),(30000,2)]

我是新来的scala,我尝试了多种选择,但没有得到正确的结果。任何帮助真的感谢。。。

njthzxwz

njthzxwz1#

您可以使用高阶函数,如 transform 在最新版本的Spark如下。数据:

val df = Seq(
  ("112240386", "7435038894", 2, Array(4659, 53540, 7890)),
  ("113325994", "7940375640", 1, Array(7232, 7840, 8396)),
  ("223352476", "7765270324", 4, Array(999)),
  ("345936074", "7950076012", 1, Array(78650, 4829, 3000)),
).toDF("key_1", "key_2", "key_3", "ref_code")

Spark3.0.0+

df.withColumn("ref_code", transform($"ref_code", (x, i) => struct(x, i) ))

Spark>2.4

df.withColumn("ref_code", expr("transform(ref_code, (x,i) -> (x,i) )"))

Spark<2.4

val addIndex = udf((arr: Seq[Int]) => arr.zipWithIndex)
df.withColumn("ref_code", addIndex($"ref_code")).show(false)

输出:

+---------+----------+-----+----------------------------------+
|key_1    |key_2     |key_3|ref_code                          |
+---------+----------+-----+----------------------------------+
|112240386|7435038894|2    |[[4659, 0], [53540, 1], [7890, 2]]|
|113325994|7940375640|1    |[[7232, 0], [7840, 1], [8396, 2]] |
|223352476|7765270324|4    |[[999, 0]]                        |
|345936074|7950076012|1    |[[78650, 0], [4829, 1], [3000, 2]]|
+---------+----------+-----+----------------------------------+

这里有更多关于变换的例子

相关问题