如何处理未授权访问的主题在Kafka河

hgc7kmma  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(628)

情况如下。
我们在kafka代理中设置了ssl+acl。
我们正在设置流,它从两个主题读取消息:

KStream<String, String> stringInput 
    = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );

stringInput
    .filter( streamFilter::passOrFilterMessages )
    .map( processor )
    .to( outTopicName );

这是做了两次(在循环)。然后设置常规错误处理程序:

streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
                    synchronized ( this ) {
                        LOG.fatal( ... );
                        this.stop();
                    }
                }
        );

问题如下。例如,在一个主题中,证书不再有效。流正在引发未授权访问主题的异常。。。到现在为止,一直都还不错。
但是异常是由常规错误处理程序处理的,因此即使第二个主题没有问题,整个应用程序也会停止。
问题是,如何处理每个主题的异常?如何避免由于某个主题的授权有问题而导致整个应用程序在某个时刻停止的情况?
我明白,如果代理不可用,那么完整的应用程序可能会停止。但是如果只有一个主题不可用,那么单个流应该停止,并且不能完成应用,或者?

tpxzln5u

tpxzln5u1#

根据设计,kafka streams将拓扑视为一个拓扑,无法区分这两个部分。对于您的具体情况,当您循环并构建到独立的管道时,您可以运行两个 KafkaStreams 并行的示例(在同一个应用程序/jvm中),将两者相互隔离。因此,如果一个失败,另一个不会受到影响。你需要使用两个不同的 application.id 对于这两种情况。

相关问题