我正在使用Azure Databricks来运行scala notebook,这些notebook读取了几个Kafka主题。现在,它们连续运行,但我希望它们在10分钟后结束,并允许调度程序稍后重新启动作业。
(This是出于成本原因;我不希望集群连续运行,也不介意消息在主题中积累,直到下一次运行。)
这是它现在的样子:
| 命令|内容|
| --|--|
| CMD 1|瓦尔df 1 = spark.readStream().subscribe(“topic 1”)..load()|
| CMD 2| df1.writeStream.outputMode(“append”)。.start()|
| CMD 3|瓦尔df 2 = spark.readStream().subscribe(“topic 2”)..load()|
| CMD 4|//类似于df 2|
= =-
那么问题是,我如何告诉df 1和df 2在10分钟后完成他们当前的批处理并优雅地关闭笔记本?
我对数据块不熟悉,所以从创建流时阅读可用的选项开始:https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
似乎没有人说“X秒后就认输”。我不能使用kafkaConsumer.pollTimeoutMs和fetchOffset.numRetries选项,因为经常会有少量记录进来。
1条答案
按热度按时间7vux5j2d1#
您可以使用awaitTermination在指定时间后停止流。您可以使用下面的代码为您的流。
这将在5分钟后停止流。