我在scala 2.11中使用spark v2.0.2,在kafka v0.8.2中使用spark Streaming(createdirectstream方法)。
我正在使用来自kafka主题的消息,并且有一种方法可以打印所创建RDD的偏移范围。但是,我想知道是否有办法访问spark当前处理的每条消息的偏移量,以便我可以将最新处理的消息偏移量id提交给zookeeper,或者定期将最新处理的消息id保存到db或文件?
我读过spark文档,但找不到关于如何实现它的信息。
我在scala 2.11中使用spark v2.0.2,在kafka v0.8.2中使用spark Streaming(createdirectstream方法)。
我正在使用来自kafka主题的消息,并且有一种方法可以打印所创建RDD的偏移范围。但是,我想知道是否有办法访问spark当前处理的每条消息的偏移量,以便我可以将最新处理的消息偏移量id提交给zookeeper,或者定期将最新处理的消息id保存到db或文件?
我读过spark文档,但找不到关于如何实现它的信息。
暂无答案!
目前还没有任何答案,快来回答吧!