Spring Boot 流异常:无法初始化状态,如果Kafka Streams的多个示例在同一状态目录中运行,则可能发生这种情况

cbwuti44  于 2022-11-05  发布在  Spring
关注(0)|答案(6)|浏览(111)

这是关于将生产中使用窗口的现有代码库从kafka-clients、kafka-streams、spring-kafka 2.4.0升级到2.6.x,以及将spring-boot-starter-parent从2.2.2.RELEASE升级到2.3.x,因为2.2与kafka-streams 2.6不兼容。
现有代码中的这些Bean在旧版本(2.4.0、2.2 Spring 发布版)中有如下所述:

@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder)  {
 //Your topology code
 return streamsBuilder.build();
}

@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}

现在,在将kafka streams、kafka clients升级到2.6.2,并将spring kafka升级到2.6.x之后,观察到以下异常:

2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN   o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
piztneat

piztneat1#

这里的问题是较新版本的spring-kafka自动初始化基于拓扑bean的kafka流的另一个示例,并且generickafkaStreams的另一个bean从现有代码库初始化,这导致多个线程试图锁定状态目录,从而导致错误。
即使在Spring Boot 级别禁用KafkaAutoConfiguration也不会禁用这种行为。这是一个很难识别的问题,并且浪费了很多时间。
修复方法是去掉拓扑bean,并使用我们自己的自定义kafka streams bean,如下所示:

protected Topology customTopology()  {

    //topology code
    return streamsBuilder.build();
    }

    /**
     * This starts kafka stream application and sets the state listener and state
     * store listener.
     * 
     * @return KafkaStreams
     */
    @Bean("GenericKafkaStreams")
    public KafkaStreams kStream() {
    KafkaStreams kafkaStreams = new KafkaStreams(customTopology(), kstreamsconfigs);
    return kafkaStreams;
    }
fzwojiic

fzwojiic2#

如果您在Spring Cloud Streams Kafka Streams Binder 3.0风格的应用程序中有一个复杂的Kafka Streams拓扑,您可能需要为不同的函数指定不同的应用程序ID,如下所示:

spring.cloud.stream.function.definition: myFirstStream;mySecondStream
...
spring.cloud.stream.kafka.streams:
  binder:
    functions:
      myFirstStream:
        applicationId: app-id-1
      mySecondStream:
        applicationId: app-id-2
h7appiyu

h7appiyu3#

当您在same machine上运行多个相同的application(name/id)时,可能会发生类似的错误。
请访问State.dir以获得此想法。
您可以在Kafka配置中添加它,并使它对每个示例都是唯一的,
如果您使用的是spring cloud stream(不能在同一台机器上使用相同的端口):

spring.cloud.stream.kafka.streams.binder.configuration.state.dir: ${spring.application.name}${server.port}

更新:

spring stream kafka的情况下:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(APPLICATION_ID_CONFIG, springApplicationName);
    props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(StreamsConfig.STATE_DIR_CONFIG, String.format("%s%s", springApplicationName, serverPort));

    return new KafkaStreamsConfiguration(props);
}

或:

spring.kafka:
    bootstrap-servers: ....
    streams:
      properties:
        application.server: localhost:${server.port}
        state.dir: ${spring.application.name}${server.port}
2g32fytz

2g32fytz4#

我处理过以下版本的问题:

  • Boot 程序2.5.3版
  • Kafka:Spring-Kafka:2.7.5
  • Kafka:Kafka-客户端:2.8.0
  • Kafka:Kafka-流:2.8.0

检查此:状态目录
默认情况下,它是在临时文件夹中创建的,kafka streams应用程序ID如下:/var/文件夹/xw/xgslnvzj 1 zj 6 wp86 wpd 8hqjr 0000 gn/T/Kafka流/${Spring.Kafka.流.应用程序ID}/.lock

如果两个或多个Kafka Streams应用使用相同的 spring.kafka.streams.application-id,则会出现此异常。因此,只需更改Kafka Streams应用的id即可。

或者在流配置中手动设置目录选项StreamsConfig.STATE_DIR_CONFIG。

jvidinwx

jvidinwx5#

以上设置状态目录的答案对我来说非常好。谢谢。添加一个观察,可能对使用spring-boot的人有帮助。当在同一台机器上工作并试图启动多个kafka流应用程序示例时,如果您启用了属性spring.devtools.restart.enabled(在开发配置文件中通常是这种情况),您可能希望禁用它,因为当同一个应用程序示例自动重新启动时,它可能不会获得存储锁。这就是我所面临的问题,并且能够通过禁用重新启动行为来解决。

4urapxun

4urapxun6#

在我的例子中,指定一个单独的@TestConfiguration类,在这个类中,我为每个SpringBoot测试上下文指定一个改变应用程序名称的计数器。

@TestConfiguration
public class TestKafkaStreamsConfig {
    private static final AtomicInteger COUNTER = new AtomicInteger();

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        final var props = new HashMap<String, Object>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-id-" + COUNTER.getAndIncrement());

//      rest of configuration  
        return new KafkaStreamsConfiguration(props);
    }
}

当然,我必须启用SpringBean覆盖来替换主配置。
编辑:我使用的是SpringBoot v.2.5.10,所以在我的例子中,为了使用@TestConfiguration,我必须将它传递给@SpringBootTest(classes =)注解。

相关问题