librdkafka的自定义插件,注册oauthbearer\u token\u refresh\u cb未被调用

xcitsw88  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(357)

我正在尝试为它编写自定义插件 librdkafka 它将提供 oauthbearer_token_refresh_cb .
在我们公司,我们使用kafka和定制的oauthbearer sasl机制,在java/kotlin中运行良好,在.net中也运行良好。下一步是实现相同的 librdkafka 因为我们需要它来实现clickhouse db的kafka连接器 librdkafka .
我尝试了以下插件驱动:

自定义oauth.c


# include <librdkafka/rdkafka.h>

# include <stdio.h>

void oauth_refresh (rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) {
        char err_msg[2048];
        const char *token;
        long long expiry = 0;
        printf("============== Initializing oauthbearer config: %s\n", oauthbearer_config);
        token = "<custom logic implementation>";
        rd_kafka_oauthbearer_set_token(rk, token, expiry, "", NULL, 0, err_msg, sizeof(err_msg));
        printf("============== Oauthbearer token set: %s\n", token);
}

void conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size) {
        printf("============== registering callback\n");
        rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf, oauth_refresh);
        if (rd_kafka_last_error()) {
                printf("============== got some error: %d\n", rd_kafka_last_error());
        }
        printf("============== callback registered\n");
}

编译时使用:

gcc -c -fPIC custom-oauth.c -o oauth-cb.o

生成共享库:

gcc -shared oauth-cb.o -lrdkafka -o lib-oauth-cb.so

试试看吧 kafkacat 签署人:

kafkacat -L \
    -X debug="all" \
    -X log_level=0 \
    -b my-broker-1:9094 \
    -X security.protocol=SASL_SSL \
    -X ssl.ca.location=/path-to-my-cer.pem \
    -X sasl.mechanisms=OAUTHBEARER \
    -X plugin.library.paths=lib-oauth-cb.so \
    -X sasl.oauthbearer.config="my-custom-oauth-config"

结果:

============== registering callback
============== callback registered
% ERROR: No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so)

如果我把发生在 rd_kafka_conf_set_oauthbearer_token_refresh_cb 那我就不明白了 % ERROR: No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so) 但这样也不起作用,而且我设置oauth承载令牌的回调从未被调用。这是输出:

============== registering callback
%7|1605365045.759|SASL|rdkafka#producer-1| [thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER
%7|1605365045.759|OPENSSL|rdkafka#producer-1| [thrd:app]: Using OpenSSL version OpenSSL 1.1.1g  21 Apr 2020 (0x1010107f, librdkafka built with 0x1010107f)
%7|1605365045.762|SSL|rdkafka#producer-1| [thrd:app]: Loading CA certificate(s) from file /path-to-my-cer.pem
%7|1605365045.763|WAKEUPFD|rdkafka#producer-1| [thrd:app]: sasl_ssl://my-broker-1:9094/bootstrap: Enabled low-latency ops queue wake-ups
%7|1605365045.763|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1605365045.763|BROKER|rdkafka#producer-1| [thrd:app]: sasl_ssl://my-broker-1:9094/bootstrap: Added new broker with NodeId -1
%7|1605365045.763|BRKMAIN|rdkafka#producer-1| [thrd:sasl_ssl://my-broker-1:9094/bootstrap]: sasl_ssl://my-broker-1:9094/bootstrap: Enter main broker thread
%7|1605365045.763|CONNECT|rdkafka#producer-1| [thrd:app]: sasl_ssl://my-broker-1:9094/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1605365045.763|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://my-broker-1:9094/bootstrap]: sasl_ssl://my-broker-1:9094/bootstrap: Received CONNECT op
%7|1605365045.763|STATE|rdkafka#producer-1| [thrd:sasl_ssl://my-broker-1:9094/bootstrap]: sasl_ssl://my-broker-1:9094/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1605365045.763|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://my-broker-1:9094/bootstrap]: Broadcasting state change
%7|1605365045.763|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, CC CXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%7|1605365045.763|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1605365046.763|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1605365047.763|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1605365048.763|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1605365049.764|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1605365050.766|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1605365050.766|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
% ERROR: Failed to acquire metadata: Local: Broker transport failure

注意事项:

这个kafka集群有3个不同的侦听器
端口9092纯文本协议(试用过kafkacat,效果不错)
端口9093 ssl协议(试用过kafkacat,效果不错)
端口9094 sasl\ U ssl协议,oauthbearer作为sasl机制
我已安装 librdkafka 以及 kafkacat 使用 brew 在我的mac电脑上,尽我所知,librdkafka的版本是 1.5.0 ##问题
为什么我要回电话 oauth_refresh 从未打过电话(没有 ============== Initializing oauthbearer config ... 印在 stdout )
为什么我会出错 No such configuration property: "plugin.library.paths" (plugin lib-oauth-cb.so) 如果有的话 printf 之后 rd_kafka_conf_set_oauthbearer_token_refresh_cb 只要 rd_kafka_conf_set_oauthbearer_token_refresh_cb 被召唤 printf 即使多次设置也不会导致此错误:
printf rd_kafka_conf_set_oauthbearer_token_refresh_cb printf rd_kafka_conf_set_oauthbearer_token_refresh_cb 它的副作用是什么 printf 导致这个错误?
问题的这一部分不那么重要,因为我不打算保留那些印刷品 1) 事项

目标

我的目标是让我们的oauthbearer方案有一个小的可插入库,可以插入到任何使用 librdkafka (最终目标是clickhouse的Kafka连接器)
我曾经 kafkacat 只是因为这是一个很容易实验的工具
关键点是可插入性,这样就不需要重新编译一些使用librdkafka的应用程序,只需要使用定制的刷新oauth令牌逻辑即可。

编辑

我有几个问题:
确认人签字:
需要: rd_kafka_resp_err_t conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size) 而不是: void conf_init(rd_kafka_conf_t *conf, const char *path, char *errstr, size_t errstr_size) 设置oauthbearer令牌时,需要 nowMs + someTTL 相反 0 在我的例子中
我最终使用rdkafka\u example.c而不是installed kafkacat 自从最新版本的Kafka猫,我安装了 brew1.5.0 当和这个老家伙赛跑的时候 kafkacat 我犯了以下错误 kafkacat(76875,0x11b279dc0) malloc:***error for object 0x10bad7bd0: pointer being freed was not allocated kafkacat(76875,0x11b279dc0) malloc:***set a breakpoint in malloc_error_break to debug 我让它在消费者模式下工作 -C 但不是在打印元数据模式下 -L 因为与 -L 模式不调用 rd_kafka_poll 所以我的回电一直没人打。
最后我加了 rd_kafka_poll(rk, 5000); 在rdkafka_example.c中 -L 开关已启用

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题