这是关于将生产中使用窗口的现有代码库从kafka-clients、kafka-streams、spring-kafka 2.4.0升级到2.6.x,以及将spring-boot-starter-parent从2.2.2.RELEASE升级到2.3.x,因为2.2与kafka-streams 2.6不兼容。
现有代码中的这些Bean在旧版本(2.4.0、2.2 Spring 发布版)中有如下所述:
@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder) {
//Your topology code
return streamsBuilder.build();
}
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}
现在,在将kafka streams、kafka clients升级到2.6.2,并将spring kafka升级到2.6.x之后,观察到以下异常:
2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
6条答案
按热度按时间piztneat1#
这里的问题是较新版本的spring-kafka自动初始化基于拓扑bean的kafka流的另一个示例,并且generickafkaStreams的另一个bean从现有代码库初始化,这导致多个线程试图锁定状态目录,从而导致错误。
即使在Spring Boot 级别禁用KafkaAutoConfiguration也不会禁用这种行为。这是一个很难识别的问题,并且浪费了很多时间。
修复方法是去掉拓扑bean,并使用我们自己的自定义kafka streams bean,如下所示:
fzwojiic2#
如果您在Spring Cloud Streams Kafka Streams Binder 3.0风格的应用程序中有一个复杂的Kafka Streams拓扑,您可能需要为不同的函数指定不同的应用程序ID,如下所示:
h7appiyu3#
当您在
same machine
上运行多个相同的application(name/id)
时,可能会发生类似的错误。请访问State.dir以获得此想法。
您可以在Kafka配置中添加它,并使它对每个示例都是唯一的,
如果您使用的是spring cloud stream(不能在同一台机器上使用相同的端口):
更新:
在
spring stream kafka
的情况下:或:
2g32fytz4#
我处理过以下版本的问题:
检查此:状态目录
默认情况下,它是在临时文件夹中创建的,kafka streams应用程序ID如下:/var/文件夹/xw/xgslnvzj 1 zj 6 wp86 wpd 8hqjr 0000 gn/T/Kafka流/${Spring.Kafka.流.应用程序ID}/.lock
如果两个或多个Kafka Streams应用使用相同的 spring.kafka.streams.application-id,则会出现此异常。因此,只需更改Kafka Streams应用的id即可。
或者在流配置中手动设置目录选项StreamsConfig.STATE_DIR_CONFIG。
jvidinwx5#
以上设置状态目录的答案对我来说非常好。谢谢。添加一个观察,可能对使用spring-boot的人有帮助。当在同一台机器上工作并试图启动多个kafka流应用程序示例时,如果您启用了属性spring.devtools.restart.enabled(在开发配置文件中通常是这种情况),您可能希望禁用它,因为当同一个应用程序示例自动重新启动时,它可能不会获得存储锁。这就是我所面临的问题,并且能够通过禁用重新启动行为来解决。
4urapxun6#
在我的例子中,指定一个单独的
@TestConfiguration
类,在这个类中,我为每个SpringBoot测试上下文指定一个改变应用程序名称的计数器。当然,我必须启用SpringBean覆盖来替换主配置。
编辑:我使用的是SpringBoot v.2.5.10,所以在我的例子中,为了使用
@TestConfiguration
,我必须将它传递给@SpringBootTest(classes =)
注解。