什么命令可以优雅地停止/暂停Kafka制作者的流式数据?
oxf4rvwz1#
要阻止生产者流式传输数据,您可以停止该应用程序(终止它的进程,或者干脆不向kafka代理发送任何数据。)不过,您可以优雅地阻止Kafka消费者使用流式数据。根据kafka权威指南,如果要退出无限轮询循环,可以调用consumer.wake()方法。当您决定退出轮询循环时,您将需要另一个线程来调用consumer.wakeup()。如果在主线程中运行consumer循环,可以通过shutdownhook完成。请注意,consumer.wakeup()是唯一可以从其他线程安全调用的consumer方法。调用wakeup将导致poll()退出并出现wakeupexception,或者如果在线程未等待poll时调用consumer.wakeup(),则在调用poll()时,将在下一次迭代中引发异常。不需要处理wakeupexception,但在退出线程之前,必须调用consumer.close()。代码示例
1条答案
按热度按时间oxf4rvwz1#
要阻止生产者流式传输数据,您可以停止该应用程序(终止它的进程,或者干脆不向kafka代理发送任何数据。)
不过,您可以优雅地阻止Kafka消费者使用流式数据。
根据kafka权威指南,如果要退出无限轮询循环,可以调用consumer.wake()方法。
当您决定退出轮询循环时,您将需要另一个线程来调用consumer.wakeup()。如果在主线程中运行consumer循环,可以通过shutdownhook完成。请注意,consumer.wakeup()是唯一可以从其他线程安全调用的consumer方法。调用wakeup将导致poll()退出并出现wakeupexception,或者如果在线程未等待poll时调用consumer.wakeup(),则在调用poll()时,将在下一次迭代中引发异常。不需要处理wakeupexception,但在退出线程之前,必须调用consumer.close()。
代码示例