我注意到Kafka消费者主张使用 wakeup()
和自定义布尔值关闭kafkaconsumer:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
为什么Kafka不 poll()
方法抛出jdk interruptedexception,那么我们只需处理interruptedexception,就可以避免使用以下两个步骤编写shutdown方法:
closed.set(true);
consumer.wakeup();
Kafka解释说:
请注意,虽然可以使用线程中断而不是wakeup()来中止阻塞操作(在这种情况下,会引发interruptexception),但我们不鼓励使用它们,因为它们可能会导致终止使用者的完全关闭。中断主要支持那些不可能使用wakeup()的情况,例如当使用者线程由不知道kafka客户机的代码管理时。
但事实上,我并不知道它的意义。有人能解释得更清楚些吗?
除此之外, poll()
投掷 org.apache.kafka.common.errors.InterruptException
,这是jdk interruptedexception的未检查 Package 。为什么要使用这个新的异常呢?
任何建议都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!