spark streaming kafka-如何在处理所有现有消息后停止流媒体(优雅地)

9jyewag0  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(241)

这就是我想做的
从Kafka主题流式传输数据,该主题不断地获取数据。每天运行作业两次,以处理该点上现有的所有数据并停止流。
所以我最初在查询上放置并调用stop,但它抛出了“timeoutexception”
然后我尝试动态增加超时,但是现在我得到java.io.ioexception:由java.lang.interruptedexception引起
那么,有没有什么方法可以在没有任何异常的情况下优雅地停止流呢?
下面是我当前的代码(part),它抛出了中断的异常

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", os.environ["KAFKA_SERVERS"])
    .option("subscribe", config.kafka.topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 25000)
    .load()
)

# <do some processing and save the data>

def save_batch(batch_df, batch_id):
    pass

query = df.writeStream.foreachBatch(save_batch).start(
    outputMode="append",
    checkpointLocation=os.path.join(checkpoint_path, config.kafka.topic),
)

while query.isActive:
    progress = query.lastProgress
    if progress and progress["numInputRows"] < 25000 * 0.9:
        timeout = sum(progress["durationMs"].values())
        timeout = min(5 * 60 * 1000, max(15000, timeout))
        spark.conf.set("spark.sql.streaming.stopTimeout", str(timeout))
        stream_query.stop()
        break
    time.sleep(10)

spark版本:2.4.5 scala版本:2.1.1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题