重新启动pyspark作业不会获得在pyspark使用者关闭时插入kafka主题的记录

ruarlubt  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(420)

我正在运行一个pyspark作业,数据流来自kafka。我试图在windows系统中复制一个场景,以了解当消费者在数据不断输入kafka时出现故障时会发生什么。
这就是我所期望的。
producer启动并生成消息1、2和3。
消费者在线并使用消息1、2和3。
现在消费者因为某种原因而下降,而生产者产生消息4、5和6等等。。。
当消费者出现时,我的期望是它应该读到它结束的地方。因此,消费者必须能够阅读信息4、5、6等等。。。。
我的pyspark应用程序无法实现我的期望。下面是我如何创建spark会话的。

session.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "clickapijson")
  .option("startingoffsets" , "latest") \
  .load()

我在谷歌上搜索并收集了不少信息。似乎groupid与此相关。kafka维护每个使用者在特定groupid中读取的偏移量的跟踪。如果消费者订阅了一个带有groupid的主题,比如g1,kafka会注册这个组和consumerid,并跟踪这个groupid和consumerid。如果消费者由于某种原因不得不下降,并使用相同的groupid重新启动,那么kafka将拥有已经读取的偏移量的信息,因此消费者将从其停止的位置读取数据。
当我使用下面的命令在cli中调用consumer job时,就会发生这种情况。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test

现在,当我的生产者生成消息1、2和3时,消费者就可以消费了。在读取第3条消息后,我终止了正在运行的使用者作业(cli.bat文件)。我的制作人生成消息4、5和6,依此类推。。。。现在我带回我的消费者作业(cli.bat文件),它能够从它停止的地方(从消息4)读取数据。这是我所期望的。
我不能在Pypark做同样的事。
当我包括 option("group.id" , "test") ,它抛出一个错误,称为kafka option group.id 不支持,因为用户指定的使用者组不用于跟踪偏移。
在观察控制台输出时,每次启动pyspark使用者作业时,它都会创建一个新的groupid。如果我的pyspark作业以前使用groupid运行并且失败,那么当它重新启动时,它不会拾取相同的旧groupid。它正在随机获取一个新的groupid。kafka有上一个groupid的偏移量信息,但没有当前新生成的groupid。因此,我的pyspark应用程序无法读取输入kafka的数据。
如果是这样的话,那么当消费者的工作由于一些失败而下降时,我会丢失我的数据吗?
如何将自己的groupid赋予pyspark应用程序,或者如何使用相同的旧groupid重新启动pyspark应用程序?

tkqqtvp1

tkqqtvp11#

在当前的spark版本(2.4.5)中,无法提供您自己的 group.id 因为它是由spark自动创建的(正如您已经观察到的)。有关Kafkaspark reading中偏移管理的详细信息,请参见下文:
请注意,无法设置以下kafka参数,kafka源或接收器将引发异常:
group.id:kafka source将为每个查询自动创建一个唯一的组id。
auto.offset.reset:设置源选项startingoffset以指定从何处开始。结构化流媒体管理哪些偏移量是内部消耗的,而不是依赖Kafka消费者来完成。这将确保动态订阅新主题/分区时不会丢失任何数据。请注意,startingoffset仅在新的流式查询启动时适用,并且恢复总是从查询停止的地方开始。
enable.auto.commit:kafka源不提交任何偏移量。
为了让spark能够记住它停止从kafka读取的位置,需要启用检查点并提供存储检查点文件的路径位置。在python中,这看起来像:

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()

有关检查点的更多详细信息,请参阅spark文档中有关使用检查点从故障中恢复的内容。

相关问题