构造Kafka生产者失败

hlswsv35  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(586)

我正在使用kafka版本0.11.0.0,并尝试通过从avro文件加载数据来创建输入流。但它未能示例化生产者,但出现以下异常:

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 0 ms.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:415)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
    at wordcount.PayloadProducer.produceInputs(PayloadProducer.java:42)
    at wordcount.PayloadProducer.main(PayloadProducer.java:24)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/JsonProcessingException
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:47)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:73)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:42)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336)
    ... 3 more
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.core.JsonProcessingException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 9 more

我的工作如下链接:合流Kafka流的例子
用于示例化生产者的代码:

final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    io.confluent.kafka.serializers.KafkaAvroSerializer.class);
            props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);

            GenericRecordBuilder pageViewBuilder = new GenericRecordBuilder(loadSchema("payload.avsc"));

            KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
7vhp5slm

7vhp5slm1#

如例外所述,你错过了 com.fasterxml.jackson.core.JsonProcessingException
您是克隆了存储库,还是只是复制粘贴了代码?你需要一个正确的答案 .pom ,就像这个:https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/pom.xml 包括所有正确的从属关系,包括Jackson。
我建议克隆存储库(至少从这里开始:https://github.com/confluentinc/examples/tree/3.3.0-post/kafka-streams)并遵循自述文件中所述的命令和要求。

相关问题