如何为spark structured streaming指定kafka consumer的组id?

xmq68pz9  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(700)

我想在同一个emr集群中运行2个spark结构化流媒体作业,以使用相同的kafka主题。两个作业都处于运行状态。但是,只有一个作业可以获得Kafka数据。我的Kafka零件配置如下。

.format("kafka")
        .option("kafka.bootstrap.servers", "xxx")
        .option("subscribe", "sametopic")
        .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "./cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
        .load()

我没有设置group.id。我猜两个作业中的同一个组id是导致此问题的原因。但是,当我设置group.id时,它会抱怨“用户指定的使用者组不用于跟踪偏移。”。解决这个问题的正确方法是什么?谢谢!

3hvapo4f

3hvapo4f1#

你需要运行spark v3。
从https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Kafka集团id
从Kafka读取时在Kafka消费者中使用的Kafka组id。小心使用。默认情况下,每个查询为读取数据生成一个唯一的组id。这样可以确保每个kafka源都有自己的使用者组,该使用者组不会受到任何其他使用者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于kafka组的授权),您可能希望使用特定的授权组id来读取数据。您可以选择设置组id。但是,请非常小心,因为这可能会导致意外行为。同时运行的查询(批处理和流式处理)或具有相同组id的源可能相互干扰,导致每个查询只读取部分数据。当连续快速启动/重新启动查询时,也可能发生这种情况。要最小化此类问题,请将kafka使用者会话超时设置为非常小(通过设置选项“kafka.session.timeout.ms”)。设置此选项后,将忽略选项“groupidprefix”。

相关问题