为什么kafka消费者在poll()时不使用jdk interruptedexception?

xghobddn  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(350)

我注意到Kafka消费者主张使用 wakeup() 和自定义布尔值关闭kafkaconsumer:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;

     public KafkaConsumerRunner(KafkaConsumer consumer) {
       this.consumer = consumer;
     }

     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }

     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }

为什么Kafka不 poll() 方法抛出jdk interruptedexception,那么我们只需处理interruptedexception,就可以避免使用以下两个步骤编写shutdown方法:

closed.set(true);
         consumer.wakeup();

Kafka解释说:
请注意,虽然可以使用线程中断而不是wakeup()来中止阻塞操作(在这种情况下,会引发interruptexception),但我们不鼓励使用它们,因为它们可能会导致终止使用者的完全关闭。中断主要支持那些不可能使用wakeup()的情况,例如当使用者线程由不知道kafka客户机的代码管理时。
但事实上,我并不知道它的意义。有人能解释得更清楚些吗?
除此之外, poll() 投掷 org.apache.kafka.common.errors.InterruptException ,这是jdk interruptedexception的未检查 Package 。为什么要使用这个新的异常呢?
任何建议都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题