dataframe—使用scala和spark将具有巨大值的map类型列拆分为多行

yuvru6vn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(739)

这是问题的继续:通过tuple2的键将tuple2的值部分(一个Map)组合成单个Map分组
我现在可以使用 reduceByKey .
但现在,在决赛中 DataFrame ...
例如

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900, 6-> 111, 7-> 222, 8-> 333, 12-> 444, 13->555, 19->666})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

…有些行的map列非常大。例如 B 上面。
我正在尝试将df写入azure cosmos db core sql。在这里,上面df的每一行变成cosmos db的一个文档。问题是如果行大小超过2mb,那么COSMOSDB将拒绝该请求。
问:我想将具有巨大Map列的行拆分为多行(以便它们的大小小于2mb)。重复键列不是问题。
最终的结果可能是(如果每次都有5个以上的元素,我就把Map分割开来):

(A, {1->100, 2->200, 3->100})
(B, {4->300, 1->500, 9->300, 11->900, 5->900})
(B, {6-> 111, 7-> 222, 8-> 333, 12-> 444, 13->555})
(B, {19->666})
(C, {6->100, 4->200, 7->100, 8->200, 5->800})

你可能会问,在上一个问题,它已经分裂,那么我为什么要合并?原因是在上一个问题中,对于b,没有reducebykey,我可能有1000行。但是,最后我只需要20行,例如上面的例子。一行本来是理想的,但由于宇宙的限制,我必须创建多个文件(每个小于2mb)。
希望我明白。请让我知道任何需要的澄清。

v8wbuo2f

v8wbuo2f1#

我可以通过编写自己的自定义代码来解决这个问题,如下所示:

originalDF.rdd.reduceByKey((a, b) => a ++ b).map(row => {
  val indexedMapEntries: Map[Int, (String, String)] = row._2.zipWithIndex.map(mapWithIndex => (mapWithIndex._2, mapWithIndex._1))
  var min = 0
  var max = Math.min(indexedMapEntries.size - 1, 9999)
  var proceed = true

  var rowKeyIdToAttributesMapList: ListBuffer[(String, Map[String, String])] = new ListBuffer[(String, Map[String, String])]()

  while (proceed) {

    var tempMapToHoldEntries = Map[String, String]()
    var i = min
    while (i <= max) {
      var entry: (String, String) = indexedMapEntries.get(i).get
      tempMapToHoldEntries += entry
      i = i + 1
    }
    rowKeyIdToAttributesMapList += ((row._1, tempMapToHoldEntries))
    min = max + 1
    max = Math.min(indexedMapEntries.size - 1, max + 9999)
    if (min > (indexedMapEntries.size - 1))
      proceed = false

  }

  rowKeyIdToAttributesMapList.toList

}).flatMap(x => x).toDF("rowKeyId", "attributes")

这里,那个 originalDF 是我上一个问题中的一个(检查操作)。 10000 是每个Map的最大大小 rowKeyId . 如果Map大小超过10000,那么我将创建一个具有相同rowkeyid和循环中剩余属性的新行。

相关问题