scala 在数据砖中对Kafka使用readstream,我如何告诉流在一定时间后优雅地自我终止?

wf82jlnq  于 2023-10-18  发布在  Scala
关注(0)|答案(1)|浏览(117)

我正在使用Azure Databricks来运行scala notebook,这些notebook读取了几个Kafka主题。现在,它们连续运行,但我希望它们在10分钟后结束,并允许调度程序稍后重新启动作业。
(This是出于成本原因;我不希望集群连续运行,也不介意消息在主题中积累,直到下一次运行。)

这是它现在的样子:

| 命令|内容|
| --|--|
| CMD 1|瓦尔df 1 = spark.readStream().subscribe(“topic 1”)..load()|
| CMD 2| df1.writeStream.outputMode(“append”)。.start()|
| CMD 3|瓦尔df 2 = spark.readStream().subscribe(“topic 2”)..load()|
| CMD 4|//类似于df 2|
= =-
那么问题是,我如何告诉df 1和df 2在10分钟后完成他们当前的批处理并优雅地关闭笔记本?
我对数据块不熟悉,所以从创建流时阅读可用的选项开始:https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
似乎没有人说“X秒后就认输”。我不能使用kafkaConsumer.pollTimeoutMsfetchOffset.numRetries选项,因为经常会有少量记录进来。

7vux5j2d

7vux5j2d1#

您可以使用awaitTermination在指定时间后停止流。您可以使用下面的代码为您的流。

val  query1 = df.withColumn("key", decode($"key","UTF-8"))
.withColumn("result", decode($"value","UTF-8"))
.select("key", "result", "topic").writeStream.format("console").start()

query1.awaitTermination(5 * 60 * 1000)
query1.stop()

这将在5分钟后停止流。

相关问题