这就是我想做的
从Kafka主题流式传输数据,该主题不断地获取数据。每天运行作业两次,以处理该点上现有的所有数据并停止流。
所以我最初在查询上放置并调用stop,但它抛出了“timeoutexception”
然后我尝试动态增加超时,但是现在我得到java.io.ioexception:由java.lang.interruptedexception引起
那么,有没有什么方法可以在没有任何异常的情况下优雅地停止流呢?
下面是我当前的代码(part),它抛出了中断的异常
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_SERVERS"])
.option("subscribe", config.kafka.topic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 25000)
.load()
)
# <do some processing and save the data>
def save_batch(batch_df, batch_id):
pass
query = df.writeStream.foreachBatch(save_batch).start(
outputMode="append",
checkpointLocation=os.path.join(checkpoint_path, config.kafka.topic),
)
while query.isActive:
progress = query.lastProgress
if progress and progress["numInputRows"] < 25000 * 0.9:
timeout = sum(progress["durationMs"].values())
timeout = min(5 * 60 * 1000, max(15000, timeout))
spark.conf.set("spark.sql.streaming.stopTimeout", str(timeout))
stream_query.stop()
break
time.sleep(10)
spark版本:2.4.5 scala版本:2.1.1
暂无答案!
目前还没有任何答案,快来回答吧!