我有一个rest服务,我们称之为mdd,它有一个kafka消费者。当我第一次启动rest服务时,另一个服务告诉mdd的消费者订阅一个特定的主题,一切似乎都很顺利。
然后服务告诉mdd的消费者订阅另一个主题。我现在的方法是通过consumer.assign()方法。基本上,如果引入了一个新主题,而消费者没有被分配到该主题,那么我就将这个新主题分配给消费者。因此,一个消费者现在被分配到两个不同的主题。
这个消费者轮询消息并将它们存入hdfs。
现在我注意到的是,当第二个主题的订阅进来时,有时我会遇到一个错误,即无法在hdfs中追加到文件中,当我查看日志时,它试图追加一些本不应在以后追加的数据。例如,Kafka的数据顺序是a,b,c。当mdd将a附加到hdfs时,它尝试附加c(而不是b),同时也尝试附加b。另一个注意事项是,此时没有来自第一个主题的数据,只有来自第二个主题的数据流进来。因此,目前,只有一个Kafka主题的数据流在任何给定的时间。
有人知道会发生什么吗?当我将一个使用者分配给多个主题时,是否有可能产生一些线程问题?因为当消费者被分配到一个主题时,一切似乎都很顺利,但一旦它被分配到多个主题,我就无法在hdfs中追加到文件,因为其他某个编写器已经拥有了租约。这种错误并不经常发生,只是非常随机的。
另外,建议的修复方法是每次创建一个新主题时,创建一个新的kafka消费者吗?
1条答案
按热度按时间fumotvh31#
只让一个消费者阅读多个主题的消息绝对是有效和可行的。您遇到的问题是因为kafka当前不支持使用手动分区分配(使用kafkaconsumer#assign)和组管理(使用kafkaconsumer#subscribe)。
为了支持订阅新创建的主题,您可以尝试调用kafkaconsumer#subscribe,将正则表达式传递给它,匹配所有新创建的主题。