我有两个大的json文件,我们通过kafka流。因此,其中一个文件被推入topic1的分区0,另一个文件被推入topic1的分区1。我们使用spark流式查询和水印来连接这些文件并进行必要的计算。虽然我们加入了这些文件并进行了简单的计算,但在spark ui中,我们确实发现spark engine完成了200多个任务,这些任务需要6分钟以上的时间。
下面是我们的几个问题:1)为什么这些简单的操作有这么多的任务?2) 大型json是否在多个执行器之间拆分?根据我的理解,不可能对json的分割部分执行操作。它必须在遗嘱执行人身上。这是否意味着我们不能在多个执行器之间拆分大型xml或json以提高并行性?
谢谢
1条答案
按热度按时间qybjjes11#
都是关于分区的:
200是spark shuffle partition参数的默认值,它定义了洗牌后的分区。在您的情况下,join会导致一次洗牌,您可以使用
spark.sql.shuffle.partitions
在kafka源代码中,kafka中的分区数=spark中的分区数(在master上有merged pr,它可以设置分区数=x*kafka中的分区,在这里您可以定义x-它还没有发布)