我有Spark批处理工作,读取Kafka的数据来源。kafka的偏移量由批处理作业本身管理。作业从持久化位置(hdfs)读取开始偏移量,并使用使用者api获取结束偏移量。使用spark kafka源将数据从开始偏移量拉到结束偏移量。有一个Kafka主题有400个分区。一旦作业完成,结束偏移量就会持久化到hdfs中,以便作业知道下一次运行的开始偏移量是多少。
对于非常少量的数据,此作业运行良好。当开始偏移量和结束偏移量相隔很远(通常相隔12小时)时,作业会遇到性能问题并失败,并出现内存不足错误。下面是数据大小和spark提交配置。
Data size - 25 GB per kakfa partition. 25 * 400 GB = 10 TB
Executor Memory - 60G
No of executors - 100
Executor memory overhead - 8G
No of cores - 2
spark job的核心功能
读Kafka的资料
从kafka源代码中,将kafka值读取为json df忽略kafka键
另存为json数据,按json df中的列(视图名称)n分区。
注意-每个视图的名称都可以有自己的json模式,这就是为什么我要用这种方式分区以方便下游访问。
由于没有无序写入(重新分区、连接、聚合等),我希望当前的配置足以完成工作负载。
观察
正如预期的那样,有400个任务用于读取Kafka数据
保存json数据的400个任务
下面是失败任务的消息。
20/07/27 21:23:13 INFO clients.FetchSessionHandler: [Consumer clientId=consumer-3, groupId=spark-kafka-relation-b5dad8f2-cdab-4c03-b5c0-72e06bc43177-executor] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3233: org.apache.kafka.common.errors.DisconnectException.
20/07/27 21:23:34 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (136 times so far)
20/07/27 21:23:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (132 times so far)
20/07/27 21:24:22 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (137 times so far)
20/07/27 21:24:36 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (133 times so far)
20/07/27 21:25:09 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (138 times so far)
20/07/27 21:25:20 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (134 times so far)
20/07/27 21:25:53 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (139 times so far)
20/07/27 21:26:07 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (135 times so far)
20/07/27 21:26:39 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (140 times so far)
20/07/27 21:26:51 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (136 times so far)
20/07/27 21:27:27 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (141 times so far)
20/07/27 21:27:35 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (137 times so far)
20/07/27 21:28:15 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (142 times so far)
20/07/27 21:28:19 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (138 times so far)
20/07/27 21:28:58 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (143 times so far)
20/07/27 21:29:02 INFO sort.UnsafeExternalSorter: Thread 100 spilling sort data of 15.9 GB to disk (139 times so far)
20/07/27 21:29:42 INFO sort.UnsafeExternalSorter: Thread 101 spilling sort data of 15.9 GB to disk (144 times so far)
20/07/27 21:29:47 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/07/27 21:29:47 INFO storage.DiskBlockManager: Shutdown hook called
20/07/27 21:29:47 INFO util.ShutdownHookManager: Shutdown hook called
ExecutorLostFailure (executor 293 exited unrelated to the running tasks) Reason: Container marked as failed: container_e35_1595528864766_0024_01_000359 on host: <hostname> Exit status: -100. Diagnostics: Container released on a *lost* node.
我在第一次阅读Kafka(2000&4000)的数据后尝试重新划分,但这只会加剧问题,而不是起到帮助作用。对如何调整工作有什么建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!