Spring Boot 无效拓扑:拓扑没有流线程和全局线程,必须订阅至少一个源主题或全局表

pieyvz9o  于 2023-10-16  发布在  Spring
关注(0)|答案(2)|浏览(128)

启动Spring Boot 应用程序时出现错误。
应用程序上下文异常:无法启动bean "defaultKafkaStreamsBuilder";嵌套异常是org. springframework. Kafka. KafkaException:无法启动流:;嵌套异常为org. apache. Kafka. streams. errors. TopologyException:无效拓扑:拓扑没有流线程和全局线程,必须订阅至少一个源主题或全局表。在org。SpringFramework。背景。支持。DefaultLifecycle处理器。doStart(DefaultLifecycle Processor.java:181)~[spring-context-5.3.28.jar:5.3.28] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)~[spring-context-5.3.28.jar:5.3.28]在org. SpringFramework。背景。支持。DefaultLifecycle处理器$Lifecycle组。start(DefaultLifecycle Processor.java:356)~[spring-context-5.3.28.jar:5.3.28]在Java中。lang。Iterable. forEach(Iterable.java:86)~[?:2.9(09 - 29 - 2022)]在org. SpringFramework。背景。支持。DefaultLifecycle处理器。startBeans(DefaultLifecycleProcessor.java:155)~[spring-context-5.3.28.jar:5.3.28]在org. SpringFramework。背景。支持。DefaultLifecycle处理器。onRefresh(DefaultLifecycle Processor.java:123)~[spring-context-5.3.28.jar:5.3.28]在org. SpringFramework。背景。支持。AbstractApplicationContext. finishRefresh(AbstractApplicationContext.java:937)~[spring-context-5.3.28.jar:5.3.28]在org. SpringFramework。背景。支持。AbstractApplicationContext. refresh(AbstractApplicationContext.java:586)~[spring-context-5.3.28.jar:5.3.28]在org. SpringFramework。 Boot 。web. servlet。背景。ServletWebServerApplicationContext。refresh(ServletWebServerApplicationContext.java:147)~[spring-boot-2.7.13.jar:2.7.13]在org. SpringFramework。 Boot 。SpringApplication. refresh(SpringApplication.java:731)~[spring-boot-2.7.13.jar:2.7.13]在org. SpringFramework。 Boot 。SpringApplication. refreshContext(SpringApplication.java:408)~[spring-boot-2.7.13.jar:2.7.13]在 www.example.com :1.8.0] at java. lang. reflect. Method. invoke(Method.java:508)~[?org.springframework.boot.SpringApplication.runorg.springframework.kafka.KafkaException:org.springframework.boot.SpringApplication.run;嵌套异常为org. apache. Kafka. streams. errors. TopologyException:无效拓扑:拓扑没有流线程和全局线程,必须订阅至少一个源主题或全局表。at org. springframework. Kafka. config. StreamsBuilderFactoryBean. start(StreamsBuilderFactoryBean.java:371)~[spring-Kafka-2.9.1.jar:2.9.1] at org. springframework. context. support. DefaultLifecycle Processor. doStart(DefaultLifecycle Processor.java:178)~[spring-context-5.3.28.jar:5.3.28]. org.springframework.boot.devtools.restart.RestartLauncher.runorg.apache.kafka.streams.errors.TopologyException:无效拓扑:拓扑没有流线程和全局线程,必须订阅至少一个源主题或全局表。at org. apache. Kafka. streams. processor. internals. TopologyMetadata. getNumStreamThreads(TopologyMetadata.java:215)~[kafka-streams-3.1.2.jar:?] at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:916)~[kafka-streams-3.1.2.jar:?] at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:845)~[kafka-streams-3.1.2.jar:?] at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:751)~[kafka-streams-3.1.2.jar:?] at org. springframework. Kafka. config. StreamsBuilderFactoryBean. start(StreamsBuilderFactoryBean.java:349)~[spring-Kafka-2.9.1.jar:2.9.1] at org. springframework. context. support. DefaultLifecleProcessor. doStart(DefaultLifecleProcessor. java:178)~[spring-context-5.3.28.jar:5.3.28].增加19
验证码:

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.EnableKafkaStreams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.application.id}")
    private String applicationId;

    @Bean
    public KafkaStreams kafkaStreams(KafkaStreamsConfiguration streamsConfig) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

        StreamsBuilder builder = new StreamsBuilder();
        KStream < String, String > kStream = builder.stream("retry-events", Consumed.with(Serdes.String(), Serdes.String()));

        // Define a custom ObjectMapper for JSON parsing
        ObjectMapper objectMapper = new ObjectMapper();

        // Use mapValues to parse JSON and extract the timestamp
        kStream = kStream.mapValues(value - > {
            try {
                JsonNode jsonNode = objectMapper.readTree(value);
                long timestamp = jsonNode.get("timestamp").asLong();
                long tenMinutesAgo = System.currentTimeMillis() / 60000; // Current time in minutes
                if (timestamp <= tenMinutesAgo) {
                    return value; // Keep messages that match the criteria
                } else {
                    return null; // Filter out messages that don't match the criteria
                }
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }).filter((key, value) - > value != null); // Remove messages that failed parsing

        kStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

}
lg40wkob

lg40wkob1#

当你使用Spring Kafka Streams时,你不需要定义和创建一个KafkaStreams bean。这将由Spring为您完成,但是您确实需要给予某种拓扑来运行,正如您所注意到的那样。
最简单的配置方法是拥有一个KStream <K,V>类型的bean(无论您在这里选择了什么Key,Value),并使用StreamBuilder注入它。
这看起来像这样:

@Bean
public KStream<String, String> topologyBuilder(StreamsBuilder streamsBuilder, YourTopologyClass topo) {
    return topo.processStream(streamsBuilder);
}

你的YourTopologyClass看起来像这样:

class YourTopologyClass {
    
    public KStream<String, String> processStream(StreamsBuilder streamsBuilder) {
         // Your topology construction goes here

     }

}
myzjeezk

myzjeezk2#

删除@EnableKafkaStreams使我的应用程序启动没有任何错误。

相关问题