我有一个DataFrame,需要将其写入我们的自定义数据存储。此数据存储要求根据特定列的(account_id)值按组写入数据。例如,给定此数据:
account_id | date | value
1 | 2023-01-01 | 1
1 | 2023-01-02 | 2
2 | 2023-01-01 | 3
字符串
我需要将account_id为1的组与account_id为2的组分开处理。
我使用df.repartition(df.col("account_id")).foreachPartition(processPartition)
(在Scala中)进行了尝试
我希望我的单元测试中的模拟数据存储接收2个调用,一个是account_id为1的2行,另一个是account_id为2的1行,但它只接收到一个包含所有3行的写调用。这就好像Spark忽略了我的分区要求。
阅读了这篇文章,我得到的印象是分区纯粹是一种性能工具(用于控制如何使用并行性),Spark在这里决定甚至没有理由分割微小的框架。然而,我找不到任何文件可以清楚地说明这一点。我的问题是,在我的情况下,这不是关于性能,而是关于正确性。我也找不到任何文件告诉我该怎么做。我遇到了DataFrameWriter的partitionBy,但是这个文档都是关于磁盘上的文件的,而我需要使用我已经编写的客户端库写入我们的自定义数据存储。
最后,我在StackOverflow上发现了类似的问题,但他们明确提到DataFrame很小,性能不是问题,所以公认的答案是首先获取不同值的列表,然后迭代并为这些值创建新的DataFrames过滤alaaccount_ids.map(id => (df.where(($"account_id" === id))))
,但这将击败Spark的隐式并行性。
1条答案
按热度按时间ssgvzors1#
分区可以保证所有具有相同
account_id
的帐户最终都在同一个分区中,但不能保证单个分区只包含一个account_id
。通常情况下,account_id
的数量会比分区的数量多,所以Spark没有机会为每个account_id
创建一个分区。如果在调用repartition
时省略分区数,Spark将使用默认的分区数(spark.sql.shuffle.partitions
)。但是,您可以按键对每个分区中的行进行排序(不需要额外的 Shuffle ),然后单独处理每个键:
字符串
输出量:
型
输出中的每一行都是通过对
doSomething
的一次调用创建的,并且只包含具有唯一键的行,同时仍然并行处理分区。