如何按列的值分组处理Spark DataFrame

r7xajy2e  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(148)

我有一个DataFrame,需要将其写入我们的自定义数据存储。此数据存储要求根据特定列的(account_id)值按组写入数据。例如,给定此数据:

account_id | date       | value
         1 | 2023-01-01 | 1
         1 | 2023-01-02 | 2
         2 | 2023-01-01 | 3

字符串
我需要将account_id为1的组与account_id为2的组分开处理。
我使用df.repartition(df.col("account_id")).foreachPartition(processPartition)(在Scala中)进行了尝试
我希望我的单元测试中的模拟数据存储接收2个调用,一个是account_id为1的2行,另一个是account_id为2的1行,但它只接收到一个包含所有3行的写调用。这就好像Spark忽略了我的分区要求。
阅读了这篇文章,我得到的印象是分区纯粹是一种性能工具(用于控制如何使用并行性),Spark在这里决定甚至没有理由分割微小的框架。然而,我找不到任何文件可以清楚地说明这一点。我的问题是,在我的情况下,这不是关于性能,而是关于正确性。我也找不到任何文件告诉我该怎么做。我遇到了DataFrameWriter的partitionBy,但是这个文档都是关于磁盘上的文件的,而我需要使用我已经编写的客户端库写入我们的自定义数据存储。
最后,我在StackOverflow上发现了类似的问题,但他们明确提到DataFrame很小,性能不是问题,所以公认的答案是首先获取不同值的列表,然后迭代并为这些值创建新的DataFrames过滤alaaccount_ids.map(id => (df.where(($"account_id" === id)))),但这将击败Spark的隐式并行性。

ssgvzors

ssgvzors1#

分区可以保证所有具有相同account_id的帐户最终都在同一个分区中,但不能保证单个分区只包含一个account_id。通常情况下,account_id的数量会比分区的数量多,所以Spark没有机会为每个account_id创建一个分区。如果在调用repartition时省略分区数,Spark将使用默认的分区数(spark.sql.shuffle.partitions)。
但是,您可以按键对每个分区中的行进行排序(不需要额外的 Shuffle ),然后单独处理每个键:

//create testdata with 8 keys distributed over 5 partitions
val df = spark.createDataset(for( x <- 1 to 100 ) yield (x%8, x))
  .toDF("key", "value")
  .repartition(5, 'key)

//the method handling the data store
def doSomething(in:List[Row]): Unit = {
  if( in.length > 0 ) {
    println(in) //replace this line with whatever code should work only on a single account_id
  }
}

//processing the partitions in parallel
df.sortWithinPartitions("key")
  .foreachPartition{it:Iterator[Row] => {
    var curKey:Option[Int] = Option.empty
    var curList: ListBuffer[Row] = ListBuffer()
    while(it.hasNext) {
      val row = it.next()
      val k = row.getAs[Int]("key")
      if( curKey == Some(k) ) {
        curList += row
      }
      else {
        doSomething(curList.toList)
        curList = ListBuffer()
        curList += row
        curKey = Some(k)
      }
    }
    doSomething(curList.toList)
  }
}

字符串
输出量:

List([1,1], [1,9], [1,17], [1,25], [1,33], [1,41], [1,49], [1,57], [1,65], [1,73], [1,81], [1,89], [1,97])
List([2,2], [2,10], [2,18], [2,26], [2,34], [2,42], [2,50], [2,58], [2,66], [2,74], [2,82], [2,90], [2,98])
List([6,6], [6,14], [6,22], [6,30], [6,38], [6,46], [6,54], [6,62], [6,70], [6,78], [6,86], [6,94])
List([0,8], [0,16], [0,24], [0,32], [0,40], [0,48], [0,56], [0,64], [0,72], [0,80], [0,88], [0,96])
List([3,3], [3,11], [3,19], [3,27], [3,35], [3,43], [3,51], [3,59], [3,67], [3,75], [3,83], [3,91], [3,99])
List([5,5], [5,13], [5,21], [5,29], [5,37], [5,45], [5,53], [5,61], [5,69], [5,77], [5,85], [5,93])
List([4,4], [4,12], [4,20], [4,28], [4,36], [4,44], [4,52], [4,60], [4,68], [4,76], [4,84], [4,92], [4,100])
List([7,7], [7,15], [7,23], [7,31], [7,39], [7,47], [7,55], [7,63], [7,71], [7,79], [7,87], [7,95])


输出中的每一行都是通过对doSomething的一次调用创建的,并且只包含具有唯一键的行,同时仍然并行处理分区。

相关问题