使用spark流处理kafka消息时遇到的挑战

mnemlml8  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(627)

我想实时处理web服务器上报告的消息。web服务器上报告的消息属于不同的会话,我想进行一些会话级聚合。为此,我计划使用由KafkaSpark流前端。在我开始之前,我已经列出了这个架构将带来的一些挑战。熟悉这个生态系统的人能帮我解决这些问题吗:
如果每个kafka消息都属于一个特定的会话,那么如何管理会话相关性,以便同一个spark执行器可以看到链接到会话的所有消息?
如何确保spark执行器按照kafka报告的顺序处理属于会话的消息?我们能在不限制线程数和不产生处理开销(如按消息时间戳排序)的情况下实现这一点吗?
何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何从最后一个检查点恢复状态?
如果节点(执行器/驱动程序)在检查其状态之前崩溃,如何恢复状态?如果spark通过重放消息来重新创建状态rdd,那么它从何处开始重放kafka消息:wards上的最后一个检查点,还是处理重新创建分区所需的所有消息?spark streaming是否可以跨多个spark streaming批恢复状态,或者仅对当前批恢复状态,即如果在最后一批中没有执行检查点检查,是否可以恢复状态?

1aaf6o9v

1aaf6o9v1#

如果每个kafka消息都属于一个特定的会话,那么如何管理会话相关性,以便同一个spark执行器可以看到链接到会话的所有消息?
kafka将主题划分为多个分区,每个分区一次只能由一个使用者读取,因此需要确保属于一个会话的所有消息都进入同一个分区。分区分配是通过分配给每条消息的键来控制的,因此实现这一点的最简单方法可能是在发送数据时使用会话id作为键。这样,同一消费者将在一个会话中获得所有消息。但有一个警告:当使用者加入或离开consumergroup时,kafka将重新平衡分配给使用者的分区。如果这种情况在会话中期发生,那么它可以(并且将)发生,即会话的一半消息将传递给一个使用者,而在重新平衡之后,另一半消息将传递给另一个使用者。为了避免这种情况,您需要手动订阅代码中的特定分区,以便每个处理器都有其特定的分区集,并且不会更改这些分区。看看sparkkafka组件代码中的consumerstrategies.assign。
如何确保spark执行器按照kafka报告的顺序处理属于会话的消息?我们能在不限制线程数和不产生处理开销(如按消息时间戳排序)的情况下实现这一点吗?
kafka保留了每个分区的顺序,所以这里不需要做很多事情。唯一的办法是避免同时从生产者向代理发出多个请求,您可以通过生产者参数max.in.flight.requests.per.connection配置这些请求。只要你保持在1,你应该是安全的,如果我理解你的设置正确。
何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何从最后一个检查点恢复状态?
我建议阅读spark streaming+kafka集成指南中的偏移存储部分,它应该已经回答了很多问题。
简短的版本是,您可以将最后一次读取偏移量持久化到kafka中,并且无论何时检查您的执行器,都应该这样做。这样,每当一个新的执行器开始处理时,不管它是否从检查点恢复,它都知道从kafka的哪里读取。
如果节点(执行器/驱动程序)在检查其状态之前崩溃,如何恢复状态?如果spark通过重放消息来重新创建状态rdd,那么它从何处开始重放kafka消息:wards上的最后一个检查点,还是处理重新创建分区所需的所有消息?spark streaming是否可以跨多个spark streaming批恢复状态,或者仅对当前批恢复状态,即如果在最后一批中没有执行检查点检查,是否可以恢复状态?
我对spark的了解有点不可靠,但我想说的是,这不是kafka/spark所做的事情,而是您需要积极地用代码来影响的事情。默认情况下,如果启动了一个新的kafka流,并且没有发现以前提交的偏移量,那么它将只从主题的末尾开始读取,因此它将获得在使用者启动之后生成的任何消息。如果您需要恢复状态,那么您要么需要知道从哪个确切的偏移量开始重新读取消息,要么就从头开始重新读取。分发分区时,可以将要读取的偏移量传递到上述.assign()方法中。
我希望这有点帮助,我相信这不是一个完整的答案,所有的问题,但这是一个相当广泛的领域,工作,让我知道,如果我可以进一步帮助。

相关问题