尝试编写一个spark流式处理作业,该作业使用来自kafka的消息。以下是我迄今为止所做的:
Zookeeper
启动kafka服务器
向服务器发送了一些消息。运行以下命令时可以看到它们:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
现在正在尝试编写一个程序来计算5分钟内收到的#条消息。
代码如下所示:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不确定第三个参数(消费者组)要使用什么值。当我运行这个时 Unable to connect to zookeeper server
. 但Zookeeper在左舷奔跑 2181
; 否则第三步就不起作用了。
好像我没用 KafkaUtils.createStream
适当地。有什么想法吗?
5条答案
按热度按时间iq0todco1#
我也面临同样的问题。这是对我有效的解决方案。
分配给spark流应用程序的核心数必须大于接收器数。否则,系统将接收数据,但无法处理它。因此,Spark流至少需要两个核心。所以在我的spark提交中,我应该提到至少两个核心。
kafka-clients-version.jar应该包含在spark submit中的依赖jar列表中。
crcmnpdw2#
我认为你应该指定zookeeper的ip而不是localhost。另外,第三个参数是消费者组名称。它可以是你喜欢的任何名字。当您有多个消费者绑定到同一组时,主题分区会相应地分布。您的tweet应该是:
9w11ddsr3#
我认为,在您的代码中,调用kafkautils.createstream的第二个参数应该是host:port of kafka服务器,而不是zookeeper主机和端口。检查一下。
编辑:kafka utils api文档
根据上述文件,应为法定人数。所以应该使用zookeeper主机名和端口。
zkquorum zookeeper法定人数(hostname:port,hostname:port,..).
sxpgvts34#
没有所谓的默认消费者群体。您可以在那里使用任意非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或两个以上的消费者,他们可以是同一消费群体的一部分,也可以属于不同的消费群体。
从http://kafka.apache.org/documentation.html :
消费者
...
如果所有使用者示例都有相同的使用者组,那么这就像传统的队列平衡使用者的负载一样。
如果所有使用者示例都有不同的使用者组,那么这就像publish-subscribe一样工作,所有消息都广播给所有使用者。
我认为问题可能出在“topics”参数中。来自spark docs:
要使用的(主题名称->numpartitions)的Map。每个分区在它自己的线程中使用
您只为主题指定了一个分区,即“1”。根据代理的设置(num.partitions),可能有多个分区,并且您的消息可能会被发送到程序未读取的其他分区。
另外,我相信分区是基于0的。因此,如果只有一个分区,它的id将等于0。
cnwbcb6i5#
如果zookeeper与流媒体应用程序在同一台计算机上运行,则“localhost:2181“会有用的。否则,您必须提及zookeeper正在运行的主机的地址,并确保正在运行流应用程序的计算机能够与端口2181上的zookeeper主机通信。