我用下面的代码创建了6个输入数据流,这些数据流使用directaproach从kafka的6分区主题中读取,我发现即使为流指定相同的组id,数据也会重复6次。如果我只创建3个数据流,我会将数据重复3次,以此类推。。。。
numStreams = 6
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], {
"metadata.broker.list": brokers,
"fetch.message.max.bytes": "20971520",
"spark.streaming.blockInterval" : "2000ms",
"group.id" : "the-same"},
valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)]
kvs = ssc.union(*kafkaStreams)
我做错什么了?
3条答案
按热度按时间vwhgwdsa1#
在直接方法中,您不应该从一个主题创建许多数据流。
根据文件:
简化的并行性:不需要创建多个输入kafka流并合并它们。使用directstream,spark streaming将创建与kafka分区数量相同的rdd分区,这些分区都将并行地从kafka读取数据。所以kafka和rdd分区之间有一对一的Map,这更容易理解和调整。
所以只要创建一个dstream,spark就会使用所有kafka分区:)
rqqzpn5f2#
我不熟悉python,但是sparkscala中的直接流没有提交任何偏移量。因此,如果您在不提交任何已读消息的偏移量的情况下打开流n次,那么您的使用者将从一开始就开始。
如果在python中是相同的,则不需要启动n个流。启动一个流,spark将处理分区分配给执行者/任务本身。
vdgimpew3#
基本上,kafka主题是通过共享负载来为多个接收器/使用者进行更快的分发的。默认情况下,当您创建数据流时,一个接收器将运行并通过接收器线程(java线程)从每个kafka主题分区到数据流分区并行地接收数据。如果为一个主题创建6个数据流,则意味着为同一主题创建6个接收器,这并不意味着为每个部分创建每个数据流。每个接收器每次接收一个信号,所以你每次接收6个信号。