当我在本地使用spark流应用程序时,每个记录只读取一次,但是当我在独立集群上部署它时,它会从kafka读取两次相同的消息。另外,我已经仔细检查过了,这不是Kafka制作人的问题。
我就是这样创建流的:
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String]("aTopic", kafkaParams))
这是kafkaparams配置:
"bootstrap.servers" -> KAFKA_SERVER,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
集群有2个从机,每个从机有一个工作机,看起来每个工作机接收相同的消息。有人能帮我吗?
编辑
举个例子,当我从Kafka那里发一个点。根据此代码:
stream.foreachRDD((rdd, time) => {
if (!rdd.isEmpty) {
log.info("Taken "+rdd.count()+ " points")
}
}
我获得 "Taken 2 points"
. 如果我把它们打印出来,它们是相等的。我做错什么了吗?
我在用
“org.apache.spark”%%“spark-streaming-kafka-0-10”%%“2.2.0”
Spark2.2.0
Kafka2.11-0.11.0.1
暂无答案!
目前还没有任何答案,快来回答吧!