我通过Kafka传输大量数据。然后,我有Spark流,这是消费这些信息。基本上,spark streaming抛出了以下错误:
kafka.common.OffsetOutOfRangeException
现在我知道这个错误意味着什么。所以我把保留期改为5天。但是我还是遇到了同样的问题。然后,我列出了一个主题的所有消息,从kafka开始。毫无疑问,Kafka流媒体部分开始时的大量消息都不存在,而且由于spark流媒体稍微落后于Kafka流媒体部分,spark流媒体尝试使用已被Kafka删除的消息。不过,我认为更改保留策略可以解决以下问题:
--add-config retention.ms=....
我怀疑的是,Kafka正在删除主题中的消息,以便为新消息腾出空间(因为我们正在传输大量数据)。我是否可以配置一个属性来指定在删除之前的消息之前kafka可以存储多少字节的数据?
2条答案
按热度按时间mklgxw1f1#
使用“主题配置”属性创建主题时,可以设置主题的最大大小
retention.bytes
通过控制台,如:或者可以使用全局代理配置属性
log.retention.bytes
设置所有主题的最大大小。重要的是要知道
log.retention.bytes
不强制对主题大小进行硬限制,但它只是向Kafka发出信号,告诉他何时开始删除最旧的消息nc1teljy2#
解决此问题的另一种方法是在配置中指定spark参数: