Kafka消费者群体在Spark结构化流媒体中重要性

kx5bkwkv  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(100)

计划构建spark结构化流应用程序,从Kafka主题中读取json数据,解析数据并写入任何存储。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

根据spark文档,消费者组ID由Apache Spark内部生成。默认情况下,每个查询生成一个唯一的组ID用于阅读数据,或者我们可以使用www.example.com指定kafka.group.id。我理解在一个消费者组中,任何时候一个Kafka分区只能由一个消费者消费。如果我们所要做的就是读取、解析和写入,那么kafka.group.id的意义是什么,我们需要显式地设置它吗?

qv7cva1a

qv7cva1a1#

我们是否需要明确地设置这个?
不。正如你所发现的,Spark会自动创建一个。也可以给予组前缀而不是静态字符串。
如果我们所要做的只是读、解析和写,
这不是一个真正的Spark问题。Kafka消费者使用消费者组来实现可伸缩性并防止重复读取
您可以使用该值手动将偏移量存储回Kafka中,但检查点文件可以给予比Kafka的至少一次行为更好的处理保证。

xiozqbni

xiozqbni2#

我会避免使用它。从手册中,有一个特定的用例:
从Kafka阅读时在Kafka消费者中使用的Kafka组ID。**请谨慎使用。**默认情况下,每个查询都会生成一个唯一的组id,用于阅读数据。这确保了每个Kafka源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些场景下(例如,Kafka基于组的授权),您可能希望使用特定的授权组ID来读取数据。您可以选择设置组ID。但是,要非常小心地执行此操作,因为它可能会导致意外行为。并发运行的查询(批处理和流处理)或具有相同组ID的源可能会相互干扰,导致每个查询只能读取部分数据。当查询快速连续启动/重新启动时,也可能发生这种情况。要最小化此类问题,请将Kafka消费者会话超时(通过设置选项“kafka.session.timeout.ms“)设置为非常小。设置此选项后,将忽略“groupIdPrefix”选项。
我已经填充数据湖有一段时间了,在上一个项目中,我们只需要多个查询-每个应用程序1个,(1个应用程序用于RAW-摄取,1个应用程序用于REF-摄取,1个应用程序用于BUS-数据区处理和摄取),它工作得很好。在3.x升级中,我们考虑了您的要求,但保持原样。
Spark Structured Streaming“位于顶部”,并根据您定义的内容分配资源,因此性能在大多数方面都很好,您可以添加更多。

相关问题