我的spark结构化流媒体应用程序运行了几个小时之后,就出现了这个错误
java.lang.IllegalStateException: Partition [partition-name] offset was changed from 361037 to 355053, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
当然,每次的偏移量都不一样,但第一个偏移量总是大于第二个偏移量。主题数据不能过期,因为该主题的保留期为5天,我昨天重新创建了此主题,但今天再次出现错误。从中恢复的唯一方法是删除检查点。
spark的《Kafka整合指南》在 failOnDataLoss
选项:
当数据可能丢失(例如,主题被删除或偏移量超出范围)时,是否使查询失败。这可能是虚惊一场。当它不能按预期工作时,可以禁用它。如果由于数据丢失而无法从提供的偏移量中读取任何数据,则批处理查询将始终失败。
但我找不到任何进一步的信息,当这可以被认为是一个假警报,所以我不知道是否安全设置 failOnDataLoss
至 false
或者如果我的集群有实际问题(在这种情况下,我们实际上会丢失数据)。
更新:我已经调查了kafka的日志,在所有spark失败的情况下,kafka都记录了几个这样的消息(我假设每个spark消费者一个):
INFO [GroupCoordinator 1]: Preparing to rebalance group spark-kafka-...-driver-0 with old generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
2条答案
按热度按时间bwitn5fc1#
这似乎是spark和SparkSQLKafka库的旧版本中的一个已知错误。
我发现以下jira门票相关:
spark-28641:微批量执行提交的偏移量大于可用偏移量
spark-26267:Kafka的资料来源可能会重新处理数据
kafka-7703:调用“seektoend”后,kafkaconsumer.position可能返回错误的偏移量
简而言之,引用开发人员的话:
“这是Kafka的一个已知问题,请参阅Kafka7703。这在spark-26267的2.4.1和3.0.0中是固定的。请将spark升级到更高版本。另一种可能性是将Kafka升级到2.3.0,Kafka的一面是固定的。”
“kafka-7703只存在于kafka 1.1.0及更高版本中,因此可能的解决方法是使用没有此问题的旧版本。这不会影响spark 2.3.x及以下版本,因为我们默认使用kafka 0.10.0.1。”
在我们的例子中,我们在hdp3.1平台上也面临同样的问题。我们有spark 2.3.2和spark sql kafka库(https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.3.2.3.1.0.0-78)但是,使用kafka客户端2.0.0。这意味着我们将面临由于以下条件而导致的错误:
我们的Spark<2.4.1
1.1.0<我们的Kafka<2.3.0
jmo0nnb32#
我不再有这个问题了。我做了两个改变:
disabled yarn的动态资源分配(这意味着我必须手动计算执行器的最佳数量等,并将它们传递给
spark-submit
)升级至spark 2.4.0,Kafka客户端也从0.10.0.1升级至2.0.0
禁用动态资源分配意味着不会在应用程序运行时创建和终止执行器(=使用者),从而消除了重新平衡的需要。所以这很可能是阻止错误发生的原因。