kafka spark流媒体:无法读取消息

goqiplq2  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(347)

我集成Kafka和Spark,使用Spark流。作为Kafka制作人,我创建了一个主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我用kafka发布消息,并尝试使用spark流式java代码读取它们,并将它们显示在屏幕上。
守护程序都启动了:Spark大师,工人;Zookeeper;Kafka。
我正在使用kafkautils.createstream编写一个java代码
代码如下:

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }

        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(1));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                {
                                                    public String call(Tuple2<String, String> message)
                                                    {
                                                        System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();

        jssc.start();
        jssc.awaitTermination();

    }
}

我正在运行作业,在另一个终端,我正在运行kafka producer来发布消息:

Hi kafka
second message
another message

但是spark streaming控制台的输出日志不显示消息,而是显示接收到的零块:

-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

为什么没有收到数据块?我试过在游戏机上使用Kafka生产者消费者 bin/kafka-console-producer.... 以及 bin/kafka-console-consumer... 它的工作完美,但为什么我的代码。。。你知道吗?

fbcarpbf

fbcarpbf1#

是的,您需要从dstream访问内容。

messages.foreachRDD(<<processing for the input received in the interval>>);
kb5ga3dv

kb5ga3dv2#

问题解决了。
上面的代码是正确的。我们只需再添加两行来抑制生成的[info]和[warn]。所以最后的代码是:

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                {
                                                    public String call(Tuple2<String, String> message)
                                                    {
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();

        jssc.start();
        jssc.awaitTermination();

    }
}

我们还需要在pom.xml中添加依赖项:

<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>

此依赖关系用于使用 scala.Tuple2 错误 Stream 0 received 0 block 是由于spark worker不可用,并且spark worker核心设置为1。对于Spark流,我们需要的核心是>=2。所以我们需要在spark配置文件中进行更改。请参阅安装手册。添加行的步骤 export SPARK_WORKER_CORE=5 同时更改 SPARK_MASTER='hostname'SPARK_MASTER=<your local IP> . 当您转到spark ui web控制台时,此本地ip是粗体显示的…类似于: spark://192.168..:<port> . 我们不需要这里的港口。只需要ip。
现在重新启动spark master和spark worker并开始流式处理:)
输出:

-------------------------------------------
Time: 1417443060000 ms
-------------------------------------------
message 1

-------------------------------------------
Time: 1417443061000 ms
-------------------------------------------
message 2

-------------------------------------------
Time: 1417443063000 ms
-------------------------------------------
message 3
message 4

-------------------------------------------
Time: 1417443064000 ms
-------------------------------------------
message 5
message 6
messag 7

-------------------------------------------
Time: 1417443065000 ms
-------------------------------------------
message 8

相关问题