pyspark 关于Spark中 Dataframe 分区一致性/安全性的问题

pxy2qtax  于 2023-04-19  发布在  Spark
关注(0)|答案(1)|浏览(133)

我在玩Spark,我想尝试找到一种仅限 Dataframe 的方法,将连续的升序键分配给 Dataframe 行,以最大限度地减少数据移动。我发现了一种两遍的解决方案,可以从每个分区获取计数信息,并使用它来生成每个分区中的键。在测试中,它似乎工作得很好(比使用未分区的row_number()或RDD和zipWithIndex更快),但我不确定它是否安全,或者是否有可能保证安全。
我的玩具解决方案是这样的:

def getKeyF(offsetKeyByPartitionDict,initial=1):
   @pandas_udf('long')
   def _internal_udf(s: Iterator[pd.Series]) -> Iterator[pd.Series]:
      curKey=initial
      for x in s:
         offsetKey = offsetKeyByPartitionDict[x[0]] #should be the same each time
         yield x.index+curKey+offsetKey
         curKey = curKey + x.size
   return _internal_udf

#Get partition information
rowsWithPartition = rows.select('*',F.spark_partition_id().alias('partition'))
partitionSizes = rowsWithPartition.groupBy('partition').count()
almostAll = Window.orderBy(F.col('partition')).rowsBetween(Window.unboundedPreceding, Window.currentRow-1)
#Pass 1: Get rolling sum to get which key each partition should start at. Technically, this does shuffle but it's relatively very small
startingKeyByPartition = dict(partitionSizes.select('partition',(F.coalesce(F.sum('count').over(almostAll),F.lit(0)).alias('startIndex'))).collect())
#Pass 2: Get the keys for each partition
keys = rowsWithPartition.select('businessKey', (getKeyF(startingKeyByPartition,1)('partition')).alias('key'))

当我在一台单节点机器上本地尝试这个方法时,它工作了,但这并不意味着它在所有情况下都能工作。
我担心的是:
1.我知道Iterator[Series]-〉Iterator[Series] pandas UDF在Arrow batches上迭代,但它从来没有说一个函数调用的所有batches的并集是一个分区的数据。如果有一些不正确的情况,这段代码就会崩溃,因为它假设它是在给定分区id的所有行上运行的。这是我可以依赖的吗?
1.我真的能确定每一次传递中的每一行的“partition”列都是相同的吗?我知道函数是不确定的,但惰性计算是否意味着列也是不确定的?缓存rowsWithPartition Dataframe 是否能确保分区列保持不变,或者是否有奇怪的情况,这也不起作用?
1.如果我不能同时保证这两点,有没有办法让这个想法安全,或者它是DOA?
谢谢!

6yt4nkrj

6yt4nkrj1#

我至少从这个pull请求中找到了(1)的答案。基本上,我写的东西不安全。它适用于这个版本,但将来可能会崩溃。
仍然不确定(2)。看起来如果分区id被允许任意切换,即使在集群崩溃或任务失败的情况下,如果 Dataframe 只被部分缓存,那么它可能会导致不正确的结果,但我不知道这有多健壮。

相关问题