我们有非常简单的spark流作业(用java实现),即:
通过directstream从kafka读取JSON(kafka消息上的ACK已关闭)
将json解析为pojo(使用gson-我们的消息只有~300字节)
将pojoMap到键值的元组(value=object)
reducebykey(自定义reduce函数-始终比较对象的1个字段-质量-并使对象示例具有更高的质量)
将结果存储在状态中(通过mapwithstate存储每个键具有最高质量的对象)
将结果存储到hdfs
json是用1000个id(密钥)生成的,所有的事件都被随机分配到kafka主题分区。这也意味着,生成的对象集最大为1000,因为作业只存储每个id的最高质量的对象。
我们在aws emr(m4.xlarge=4核,16 gb内存)上运行性能测试,参数如下:
执行器数量=节点数量(即每个节点1个执行器)
kafka分区数=节点数(即在我们的例子中也是执行者)
批量=10(s)
推拉窗=20(s)
窗口大小=600(s)
块大小=2000(ms)
默认并行度-尝试了不同的设置,但是当默认并行度为=节点数/执行器数时,会获得最佳结果
kafka集群只包含1个代理,在峰值负载期间使用最多30-40%(我们将数据预填充到主题中,然后独立执行测试)。我们已经尝试增加num.io.threads和num.network.threads,但是没有显著的改进。
性能测试(约10分钟连续负载)的结果为(Yarn主节点和驱动节点位于以下节点计数的顶部):
2个节点-能够处理最多150000个事件/秒,而无任何处理延迟
5个节点-280000个事件/秒=>25%的惩罚(如果与预期的“几乎线性可伸缩性”相比)
10个节点-380000个事件/秒=>50%的惩罚(如果与预期的“几乎线性可伸缩性”相比)
两个节点的cpu利用率为~
我们还尝试了其他设置,包括:-测试低/高分区数-测试defaultparallelism的低/高/默认值-使用更多执行者进行测试(例如,将资源划分为30个执行者,而不是10个),但是上面的设置给了我们最好的结果。
那么-问题是-Kafka+Spark(几乎)线性可伸缩吗?如果它的可伸缩性比我们的测试显示的要好得多,那么它可以如何改进。我们的目标是支持成百上千的spark执行器(即可伸缩性对我们来说至关重要)。
1条答案
按热度按时间ijnw1ujt1#
我们已通过以下方式解决此问题:
提高Kafka集群的容量
更多的cpu能力-增加了kafka的节点数(每2个spark Executur节点1个kafka节点似乎可以)
更多的经纪人-基本上每个执行人有一个经纪人给了我们最好的结果
设置适当的默认并行度(集群中的核心数*2)
确保所有节点的工作量大致相同
批大小/块大小应等于或等于执行者数量的倍数
最后,我们已经能够实现由spark集群以10个executor节点处理的110000个事件/秒。所做的调整还提高了在节点较少的配置上的性能->当从2个spark executor节点扩展到10个(aws上的m4.xlarge)时,我们实际上实现了线性可伸缩性。
起初,kafka节点上的cpu没有接近极限,但是它无法响应spark执行器的需求。
感谢所有的建议,尤其是@arturbiesiadowski,他建议Kafka集群的大小不正确。