KafkaStream无法使用Avro从主题中流式传输数据

kxkpmulp  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(115)

我正在构建一个应用程序,以获得KafkaStreams的实践经验,到目前为止,我已经成功了。然而,当我特别尝试构建一个使用Avro序列化的主题的Stream时,它失败了。
使用者组ID已在集群中注册,但未订阅主题。如下图所示。
第一列显示其消费者数量,第二列显示主题数量

下面的代码是我对Kafka Stream的配置。

public static void main(String[] args) {
        //Defining the properties for the stream
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Test-stream-UserRegistrationServicebbb");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        //Defining the serde for the value
        Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081");
        SpecificAvroSerde<Pending_Registrations> pendingRegistrationsSerde = new SpecificAvroSerde<>();
        pendingRegistrationsSerde.configure(serdeConfig, false);

        StreamsBuilder builder = new StreamsBuilder();
        //Creating a stream from the topic with specific serde
        KStream<String, Pending_Registrations> userStream = builder.stream("User.Pending-Registrations",
                Consumed.with(Serdes.String(), pendingRegistrationsSerde));
        //Printing the stream
        userStream.foreach((key, value) -> System.out.println("key: " + key + " value: " + value));
        //starting the stream
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

Pending_Registration类是从Avro模式(特别是从.avsc文件)生成的。
除了Java应用程序之外的所有服务都使用Docker部署在端口上:zookeeper:2181,Kafka:9092,schema-registry:8081,汇合控制中心:9021应用程序编译和运行没有任何错误或崩溃,它只是不打印出任何东西,也没有找到任何主题。
我真的很感激任何帮助,因为我花了5个小时试图找出我错过了什么。:)
我试着遵循多个指南,以找到我们的代码之间的任何差异,但不幸的是,他们的解决方案似乎没有在我的情况下工作。
一个普通的消费者可以很好地使用avro,并且能够反序列化所有存储的消息:

public static void main(String[] args) {
        Properties streamConfig = new Properties();
        streamConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-normal-UserRegistrationService");
        streamConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
        streamConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        streamConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        streamConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, Pending_Registrations> consumer = new KafkaConsumer<>(streamConfig);
        consumer.subscribe(java.util.Collections.singletonList("User.Pending-Registrations"));
        while (true) {
            consumer.poll(java.time.Duration.ofMillis(100)).forEach(record -> {
                System.out.println("key: " + record.key() + " value: " + record.value());
            });
        }
    }

当我注解掉模式注册表url configs,SerdeConfig时,没有抛出错误。

bttbmeg0

bttbmeg01#

非常感谢@OneCricketeer帮助我解决了我忽略的问题。
事实证明,我使用的Kafka Stream和Confluent Platform版本并不兼容。使用版本后:confluentinc/cp-server:7.4.0 'kafka-streams',版本:'3.4.1'
问题解决了,Kafka流可以开始流。

相关问题