从kafka控制台生成器将数据读入flink程序

xhv8bpkk  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我创建了一个名为 test 并使用控制台生成器在控制台中编写了一些字符串。

./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

幸运的是,我能够通过使用 console-consumer . 现在,我想使用 console-consumer 在flink程序中使用下面的代码

public class ReadFromKafka {

    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        DataStream<String> message = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties));

        message.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return " Value: " + value;
            }
        }).print();

        env.execute();

    } //main
} //ReadFromKafka

pom.xml的内容如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.stsffap</groupId>
    <artifactId>cep-monitoring</artifactId>
    <name>cep-monitoring</name>
    <version>1.0</version>

    <packaging>jar</packaging>

    <properties>
        <flink.version>1.0.1</flink.version>

        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.3.2</version>
    </dependency>

        <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.3.2</version>
    </dependency>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
            <version>1.3.2</version>

        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>

                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerId>jdt</compilerId>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.eclipse.tycho</groupId>
                        <artifactId>tycho-compiler-jdt</artifactId>
                        <version>0.21.0</version>
                    </dependency>
                </dependencies>
            </plugin>

        </plugins>
    </build>
</project>

每当我执行这个代码时,我都会遇到以下错误

objc[892]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x109f654c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10afd44e0). One of the two will be used. Which one is undefined.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedFunction
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.stsffap.cep.monitoring.ReadFromKafka.main(ReadFromKafka.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 25 more

也是Kafka的一个版本,我使用下面的find by命令

find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

kafka_2.11-0.9.0.0-javadoc.jar

我是否需要使用.8.x版本的Kafka来运行我的示例?

对意见和建议高度赞赏。提前谢谢。祝你好运!

gopyfrb3

gopyfrb31#

我的程序开始工作时做了以下修改,我把Kafka的版本更新到了.9.x

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
            <version>${flink.version}</version>

        </dependency>

我将flink版本从1.0.1升级到1.1.2,如下所示

<properties>
        <!-- <flink.version>1.0.1</flink.version>-->

        <flink.version>1.1.2</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

        <dependency>

    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>${flink.version}</version>
    </dependency>

相关问题