spark上的Kafka只能读取实时摄取

juud5qan  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(356)

Spark版本=2.3.0
Kafka版本=1.0.0
正在使用的代码:


# Kafka Enpoints

zkQuorum = '192.168.2.10:2181,192.168.2.12:2181' 
topic = 'Test_topic'

# Create a kafka Stream

kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "cyd-demo-azureactivity-streaming-consumer", {topic: 1})

当kafka流实时运行时,我会看到spark拉取数据,但是如果我在spark前一小时启动kafka,它将不会提取一小时前的数据。
这是预期的还是有办法在配置中进行设置?
代码运行使用:

sudo $SPARK_HOME/spark-submit --master local[2] --jars /home/steven/jars/elasticsearch-hadoop-6.3.2.jar,/home/steven/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/steven/code/demo/test.py
vsaztqbk

vsaztqbk1#

如果总是需要从一开始就提取数据,则需要设置kafka属性“ auto.offset.reset “至” earliest ". 这将从一开始就把记录拉出来。
此参数是kafka consumer config-http://kafka.apache.org/documentation.html#newconsumerconfigs
参考链接-https://spark.apache.org/docs/2.3.0/streaming-kafka-0-10-integration.html
createstream有多个实现。。你可以使用一个你可以通过Kafka配置。创建流的示例代码-

val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"auto.offset.reset" -> "earliet"
)

val inputTopic = "input-topic"

val stream =  KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic-> 1), StorageLevel.MEMORY_ONLY_SER)

相关问题