无法使用flinkkafkaproducer在版本1.8.*和kafka 5.2中创建生产者*

hc2pp10m  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(305)

使用flink 1.8.*时,apache flink kafka生产者创建失败。
同样的事情也适用于低于1.8.的flink版本,比如1.7.。将显示以下错误消息。
制片人看起来像这样,

new FlinkKafkaProducer(topic, new SimpleStringSchema(), kafkaConnectionProperties);

错误日志附在下面

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:68)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:889)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:960)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:956)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:708)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:98)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:382)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:373)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:859)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(Ljava/util/List;ILjava/util/Map;)V
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:53)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:43)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
    ... 18 more

在下面添加了配置

ext {
    javaVersion = '1.8'
    flinkVersion = '1.8.1'
    scalaBinaryVersion = '2.12'
    confluentVersion = '5.2.2'
}

dependencies {

    compile "org.apache.flink:flink-java:${flinkVersion}"
    compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    compile group: "org.apache.flink", name: "flink-connector-kafka_${scalaBinaryVersion}", version: flinkVersion
    compile group: "org.apache.flink", name: "flink-avro-confluent-registry", version: flinkVersion
    compile "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}"
    compile "org.apache.flink:flink-table-planner_${scalaBinaryVersion}:${flinkVersion}"
    compile group: "org.apache.flink", name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: flinkVersion
    compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentVersion
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题