我正在构建一个应用程序,以获得KafkaStreams的实践经验,到目前为止,我已经成功了。然而,当我特别尝试构建一个使用Avro序列化的主题的Stream时,它失败了。
使用者组ID已在集群中注册,但未订阅主题。如下图所示。
第一列显示其消费者数量,第二列显示主题数量
下面的代码是我对Kafka Stream的配置。
public static void main(String[] args) {
//Defining the properties for the stream
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Test-stream-UserRegistrationServicebbb");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
//Defining the serde for the value
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081");
SpecificAvroSerde<Pending_Registrations> pendingRegistrationsSerde = new SpecificAvroSerde<>();
pendingRegistrationsSerde.configure(serdeConfig, false);
StreamsBuilder builder = new StreamsBuilder();
//Creating a stream from the topic with specific serde
KStream<String, Pending_Registrations> userStream = builder.stream("User.Pending-Registrations",
Consumed.with(Serdes.String(), pendingRegistrationsSerde));
//Printing the stream
userStream.foreach((key, value) -> System.out.println("key: " + key + " value: " + value));
//starting the stream
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
Pending_Registration
类是从Avro模式(特别是从.avsc文件)生成的。
除了Java应用程序之外的所有服务都使用Docker部署在端口上:zookeeper:2181,Kafka:9092,schema-registry:8081,汇合控制中心:9021应用程序编译和运行没有任何错误或崩溃,它只是不打印出任何东西,也没有找到任何主题。
我真的很感激任何帮助,因为我花了5个小时试图找出我错过了什么。:)
我试着遵循多个指南,以找到我们的代码之间的任何差异,但不幸的是,他们的解决方案似乎没有在我的情况下工作。
一个普通的消费者可以很好地使用avro,并且能够反序列化所有存储的消息:
public static void main(String[] args) {
Properties streamConfig = new Properties();
streamConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-normal-UserRegistrationService");
streamConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
streamConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
streamConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, Pending_Registrations> consumer = new KafkaConsumer<>(streamConfig);
consumer.subscribe(java.util.Collections.singletonList("User.Pending-Registrations"));
while (true) {
consumer.poll(java.time.Duration.ofMillis(100)).forEach(record -> {
System.out.println("key: " + record.key() + " value: " + record.value());
});
}
}
当我注解掉模式注册表url configs,SerdeConfig时,没有抛出错误。
1条答案
按热度按时间bttbmeg01#
非常感谢@OneCricketeer帮助我解决了我忽略的问题。
事实证明,我使用的Kafka Stream和Confluent Platform版本并不兼容。使用版本后:confluentinc/cp-server:7.4.0 'kafka-streams',版本:'3.4.1'
问题解决了,Kafka流可以开始流。