使用flink 1.8.*时,apache flink kafka生产者创建失败。
同样的事情也适用于低于1.8.的flink版本,比如1.7.。将显示以下错误消息。
制片人看起来像这样,
new FlinkKafkaProducer(topic, new SimpleStringSchema(), kafkaConnectionProperties);
错误日志附在下面
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:68)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:889)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:960)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:956)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:708)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:382)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:859)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(Ljava/util/List;ILjava/util/Map;)V
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:53)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:43)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
... 18 more
在下面添加了配置
ext {
javaVersion = '1.8'
flinkVersion = '1.8.1'
scalaBinaryVersion = '2.12'
confluentVersion = '5.2.2'
}
dependencies {
compile "org.apache.flink:flink-java:${flinkVersion}"
compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
compile group: "org.apache.flink", name: "flink-connector-kafka_${scalaBinaryVersion}", version: flinkVersion
compile group: "org.apache.flink", name: "flink-avro-confluent-registry", version: flinkVersion
compile "org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}"
compile "org.apache.flink:flink-table-planner_${scalaBinaryVersion}:${flinkVersion}"
compile group: "org.apache.flink", name: "flink-statebackend-rocksdb_${scalaBinaryVersion}", version: flinkVersion
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentVersion
}
暂无答案!
目前还没有任何答案,快来回答吧!