我正在与Kafka2.11和相当新的工作。我试图了解Kafka消费群体,我有3个Spark应用程序消费从同一个主题,他们每个人都从该主题收到所有的消息。由于我在应用程序中没有提到任何消费者组id,我假设kafka正在为每个消费者组分配一些不同的消费者组id。我需要使用下面的命令重置其中一个应用程序的kafka偏移量。因为我不知道我的应用程序的使用者组名称,所以我被困在这里了。我是否需要在应用程序中显式分配组id,然后在下面的命令中使用它?
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute
如果这是真的,我如何获得每个应用程序的消费者组id?我不能
3条答案
按热度按时间s4chpxco1#
如果你去Spark代码你可以找到
KafkaSourceProvider
类,该类负责kafka源读取器,您可以看到生成random group.id:您可以使用搜索group.id
spark-kafka-source
前缀,但找不到特定组的group.id。要查找所有使用者组ID,可以使用以下命令:
./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list
要检查使用者组偏移,可以使用以下命令:./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe
ccrfmcuu2#
消费者
group.id
是强制性的。如果不设置消费者group.id
,您将得到异常。所以很明显,你在代码中的某个地方设置它,或者你正在使用的框架或库在内部设置它。你应该一直group.id
你一个人。您可以使用以下命令获取使用者组ID:
zu0ti5jz3#
由于我在应用程序中没有提到任何消费者组id,我假设kafka正在为每个消费者组分配一些不同的消费者组id
Kafka经纪人不会将消费者组名称分配给与其相关的消费者。当消费者连接、订阅某个主题时,它“加入”了一个组。如果使用spark应用程序时未指定任何使用者组,则意味着从spark应用程序连接到kafka时使用的库/框架在某种程度上是在为使用者组本身指定名称。