如何完成kafka消费者安全?(在shutdownhook中调用thread#join有什么意义?)

2izufjch  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(686)

我正在读这篇文章
下面是完成用户线程的代码:

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup(); 1
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

据我所知,shutdownhook在所有非守护进程线程都完成但进程被os破坏之前调用。
1在我看来,mainthread.join()没用。主线程始终会在执行shutdownhook时完成。是正确的还是我误解了什么?
2事实上我不明白为什么我们要等待主信号?我们需要等待关闭方法执行吗?

p、 s。

本书提供了以下主要方法代码:

try {
            // looping until ctrl-c, the shutdown hook will cleanup on exit
            while (true) {
                ConsumerRecords<String, String> records =
                  movingAvg.consumer.poll(1000);
                System.out.println(System.currentTimeMillis() + "--  waiting for data...");
                for (ConsumerRecord<String, String> record :
                  records) {
                    System.out.printf("offset = %d, key = %s,
                      value = %s\n",
                      record.offset(), record.key(),
                      record.value());
                }
                for (TopicPartition tp: consumer.assignment())
                    System.out.println("Committing offset at position:" + consumer.position(tp));
                movingAvg.consumer.commitSync();
            }
        } catch (WakeupException e) {
            // ignore for shutdown 2
        } finally {
            consumer.close(); 3
            System.out.println("Closed consumer and we are done");
        }
    }
bejyjqdl

bejyjqdl1#

你知道吗 consumer.wakeup() 中断当前用户的操作(可能是长时间运行的(例如轮询)或甚至被阻止的操作(如果 beginningOffsets(...) .
这个 mainThread.join() 放在那里是为了确保主线程实际完成,并且在唤醒后不会在处理过程中关闭。请记住,shutdownhook也负责处理中断,而不仅仅是普通的程序关闭。
因此,如果您用ctrl-c中断:

1. shutdown hook gets called
2. main thread is still running, most often waiting for data in `poll`
3. shutdown hook `wakeup`-s the main thread
4. main thread enters the exception handler, breaks the loop
5. main thread closes the consumer with `.close()`
6. shutdown hook waits for 5. and finishes

如果不等待,您可能没有执行步骤4和5中的耗电元件关闭步骤。

相关问题