我有一个flink应用程序,它读取一个Kafka主题。我试图阻止flinkkafkaconsumer停止拉邮件。我的最终目标是构建一种方法来不时地部署我的flink应用程序,而不必停机—如何在不停机的情况下部署新作业。
我尝试过使用“kafkaconsumer.close()”,但是没有用。我试图阻止消费者在不终止整个作业的情况下获取新消息,同时我将上载一个新作业,其中包含从同一主题读取的更新代码。我该怎么做?
我有一个flink应用程序,它读取一个Kafka主题。我试图阻止flinkkafkaconsumer停止拉邮件。我的最终目标是构建一种方法来不时地部署我的flink应用程序,而不必停机—如何在不停机的情况下部署新作业。
我尝试过使用“kafkaconsumer.close()”,但是没有用。我试图阻止消费者在不终止整个作业的情况下获取新消息,同时我将上载一个新作业,其中包含从同一主题读取的更新代码。我该怎么做?
1条答案
按热度按时间z9smfwbn1#
有没有可能在Kafka主题的所有分区上发送一个特殊的“switch”消息?然后你就可以重写了
isEndOfStream(T nextElement)
在你的Kafka里DeserializationSchema
,并使新作业示例在最后一条切换消息之后开始工作