ApacheSpark—kafka direct stream是否自行创建使用者组(因为它不关心应用程序中给定的group.id属性)

mdfafbf1  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(238)

假设我刚刚推出了kafka direct stream+spark流媒体应用程序。对于第一批,驱动程序中的流上下文连接到kafka并获取startoffset和endoffset。然后,它启动一个带有这些开始和结束偏移范围的spark作业,以便执行者从kafka获取记录。我的问题从这里开始。当第二批的时间到了时,流式处理上下文连接到开始和结束偏移范围的kafka。当没有允许存储最后提交偏移量值的使用者组(因为direct stream不考虑group.id)时,kafka如何给出这些范围?

jdgnovmf

jdgnovmf1#

使用kafka消费api时,总是有一个消费组。不管您处理的是哪种流(spark direct streaming、spark structured streaming、kafka consumer的java/scala api……)。
因为直接流不考虑group.id
请参阅spark+kafka直接流媒体集成指南(spark-streaming-kafka010的代码示例),了解如何声明消费群体:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

即使您没有在配置中声明使用者组,仍然会为您创建一个(随机的)使用者组。
检查日志以查看应用程序中使用了哪个group.id。

相关问题