Kafka中的默认消费者组id

t8e9dugd  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(1001)

我正在与Kafka2.11和相当新的工作。我试图了解Kafka消费群体,我有3个Spark应用程序消费从同一个主题,他们每个人都从该主题收到所有的消息。由于我在应用程序中没有提到任何消费者组id,我假设kafka正在为每个消费者组分配一些不同的消费者组id。我需要使用下面的命令重置其中一个应用程序的kafka偏移量。因为我不知道我的应用程序的使用者组名称,所以我被困在这里了。我是否需要在应用程序中显式分配组id,然后在下面的命令中使用它?

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute

如果这是真的,我如何获得每个应用程序的消费者组id?我不能

s4chpxco

s4chpxco1#

如果你去Spark代码你可以找到 KafkaSourceProvider 类,该类负责kafka源读取器,您可以看到生成random group.id:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

  override def createSource(
    sqlContext: SQLContext,
    metadataPath: String,
    schema: Option[StructType],
    providerName: String,
    parameters: Map[String, String]): Source = {
      validateStreamOptions(parameters)
      // Each running query should use its own group id. Otherwise, the query may be only assigned
      // partial data since Kafka will assign partitions to multiple consumers having the same group
      // id. Hence, we should generate a unique id for each query.
      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    ...
  }

您可以使用搜索group.id spark-kafka-source 前缀,但找不到特定组的group.id。
要查找所有使用者组ID,可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list 要检查使用者组偏移,可以使用以下命令: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe

ccrfmcuu

ccrfmcuu2#

消费者 group.id 是强制性的。如果不设置消费者 group.id ,您将得到异常。所以很明显,你在代码中的某个地方设置它,或者你正在使用的框架或库在内部设置它。你应该一直 group.id 你一个人。
您可以使用以下命令获取使用者组ID:

bin/kafka-consumer-groups.sh  --list --bootstrap-server <kafka-broker-ip>:9092
zu0ti5jz

zu0ti5jz3#

由于我在应用程序中没有提到任何消费者组id,我假设kafka正在为每个消费者组分配一些不同的消费者组id
Kafka经纪人不会将消费者组名称分配给与其相关的消费者。当消费者连接、订阅某个主题时,它“加入”了一个组。如果使用spark应用程序时未指定任何使用者组,则意味着从spark应用程序连接到kafka时使用的库/框架在某种程度上是在为使用者组本身指定名称。

相关问题