考虑一个spark结构化的streaming作业,它从kafka读取消息。
在我们定义了多个主题的情况下,代码如何管理每个主题的偏移量?
我一直在学习kafkamicrobatchstream课程,但不知道get如何抵消不同主题的影响。
def latestoffset(start:offset,readlimit:readlimit):偏移量;方法将只返回一个偏移量。
尝试理解实现,因为我需要编写从多个rdbms表读取的自定义源代码,每个表都有自己的偏移量。偏移量只能在rdbms表中管理。
考虑一个spark结构化的streaming作业,它从kafka读取消息。
在我们定义了多个主题的情况下,代码如何管理每个主题的偏移量?
我一直在学习kafkamicrobatchstream课程,但不知道get如何抵消不同主题的影响。
def latestoffset(start:offset,readlimit:readlimit):偏移量;方法将只返回一个偏移量。
尝试理解实现,因为我需要编写从多个rdbms表读取的自定义源代码,每个表都有自己的偏移量。偏移量只能在rdbms表中管理。
1条答案
按热度按时间hc2pp10m1#
当结构化streaming作业从kafka源获取数据时,偏移量通常存储在检查点文件中。在这些文件中,您将找到每个topicpartition的最新处理偏移量(基于结构化streaming作业创建的使用者组)。术语“topicpartition”意味着偏移量存储在每个分区的每个主题中。
此检查点适用于作为源的kafka主题,因为偏移量是唯一的标识符,在消息的生命周期内不会更改。
从rdbms读取数据时,您需要跟踪streaming作业已经使用的每一行,例如通过跟踪主键。但是,您需要考虑已经使用的行的更新。
我假设这就是为什么有(尚未)结构化的streaming rdbms源可用的原因,正如结构化的streaming输入源编程指南中提到的:
文件源
kafka来源
插座电源(测试用)
速率源(用于测试)