使用spark将非规范化配置单元表加载到elasticsearch中

vxf3dgd4  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(315)

所以,我找到了很多相反的答案,但不是这个。现在听起来很傻,因为elasticsearch只处理非规范化数据,但这就是我们的问题所在。我们有一个格式如下的表:

+----+--------+--------+--------+--------+---------+
| id | attr_1 | attr_2 | attr_3 | attr_4 | fst_nm  |
+----+--------+--------+--------+--------+---------+
|  1 |   2984 |   0324 |  38432 |        | john    |
|  2 |   2343 |  28347 | 238493 |  34923 | patrick |
|  3 |   3293 |   3823 |  38423 |  34823 | george  |
+----+--------+--------+--------+--------+---------+

如果attr\u x表示相同的确切内容,那么假设它们是另一个表的外键,而这个表在规范化的世界中是分离的。所以,所有的 attrs ,存在于单独的表中。然而,这些表被反序列化,它们都被转储到一个长表中。通常情况下,加载到elasticsearch并不是一个太大的问题,但是这个表非常庞大,大约有1000多列。我们想要这些 attrs 并将它们存储为elasticsearch中的数组,如下所示:

_source: {
  "id": 1,
  "fst_nm": "john",
  "attrs": [
    2984,
    0324,
    38432
  ]
}

而不是:

_source: {
  "id": 1,
  "fst_nm": "john",
  "attr_1": 2984,
  "attr_2": 0324,
  "attr_3": 38432
}

当我们使用默认的spark进程时,它只创建底部的elasticsearch文档。我的一些想法是创建一个新的 attrs 并取消对它们的IVOT,然后按id查询该表,以获取属性,因此看起来像这样:

+-----+--------+
| id  |  attr  |
+-----+--------+
|   1 |   2984 |
|   1 |   0324 |
|   1 |  38432 |
|   2 |   2343 |
| ... |    ... |
|   3 |  34823 |
+-----+--------+

然后我们可以使用sparksql在这个新创建的表上按id进行查询,获得attrs,但是我们如何使用spark将其作为数组插入elasticsearch中呢?
我的另一个想法是在hive中创建一个新表,并将attrs更改为hive复杂类型的数组,但我不知道该怎么做。另外,如果我们使用spark在hive中查询表,当结果以数组的形式返回时,是否很容易转储到elasticsearch中?

ryoqjall

ryoqjall1#

至于数据转换部分,您可以使用 array 将多个列收集为一个数组,然后可以使用 .write.json("jsonfile") 要写入json文件:

import org.apache.spark.sql.functions.col
val attrs = df.columns.filter(_.startsWith("attr")).map(col(_))

val df_array = df.withColumn("attrs", array(attrs:_*)).select("id", "fst_nm", "attrs")

df_array.toJSON.collect
//res8: Array[String] = Array({"id":1,"fst_nm":"john","attrs":[2984,324,38432,null]}, {"id":2,"fst_nm":"patrick","attrs":[2343,28347,238493,34923]})

写入文件:

df_array.write.json("/PATH/TO/jsonfile")

相关问题