我试图阅读Kafka经纪人与Spark流,但我面临着一些问题。
def spark_streaming_from_STABLE_kafka_topic():
conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
topic = "stable_topic"
kvs = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": "my-broker",
"auto.offset.reset": "smallest"},
keyDecoder=lambda x: x,
valueDecoder=lambda x: x
)
lines = kvs.window(2, 2).map(lambda x: x[1])
lines.pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
ssc.start()
ssc.awaitTermination()
上面的代码只获取空批:
-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------
-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------
主题 stable_topic
包含固定大小的数据。它不会改变。我还有一个主题,它每秒钟接收一次数据。如果我用这个主题代替 stable_topic
然后取下 "auto.offset.reset": "smallest"
然后代码获取数据。
我想这是有问题的 {"auto.offset.reset": "smallest"}
但我想不通。
有人知道我做错了什么吗?
1条答案
按热度按时间fnx2tebb1#
在以后的版本中,
smallest
被替换为earliest
. 确保您检查了所使用版本的文档。此外,还有
auto.offset.reset
如果使用者组已在使用主题中的某些数据,则配置将不会生效stable_topic
. 因此,您可以考虑更改group.id
在你的流媒体工作中。如果要分配新的
group.id
,确保设置auto.offset.reset
至smalles
(或earliest
在较新版本中)。