我想读Flink的Kafka主题
package Toletum.pruebas;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class LeeKafka {
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer082 kafkaSrc = new FlinkKafkaConsumer082("test02",
new SimpleStringSchema(),
parameterTool.getProperties());
DataStream messageStream = env.addSource(kafkaSrc);
messageStream.rebalance().map(new MapFunction() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
env.execute("LeeKafka");
}
}
此代码成功运行:
java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
但是,当我试着用Flink的话:
flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup
我得到一个错误:
java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL;
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)
at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
2条答案
按热度按时间ubof19bj1#
此问题是由于使用旧版本的flink连接器库造成的。
您可以检查最新的可用库并下载最新的maven依赖项。
你正在使用的Kafka版本也应该考虑在内。
尝试使用来自flink文档的kafka连接器的最新maven依赖项
最新的maven依赖是
xxslljrj2#
旧版本库。。。。。
正确的pom.xml: