我正在创建一个基于Kafka的Flink流应用程序,并尝试创建一个关联的KafkaSource
连接器以读取Kafka数据。
例如:
final KafkaSource<String> source = KafkaSource.<String>builder()
// standard source builder setters
// ...
.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "truststore.jks")
.build();
truststore.jks
文件是在应用程序执行之前在本地作业管理器节点上创建的,我已经验证了它的存在和正确填充。我的问题是,在分布式Flink应用程序中,这个truststore.jks
不会自动存在于任务工作节点上,因此上面的代码在执行时会导致FileNotFoundException
。
我尝试过:
- 使用
env.registerCacheFile
和getRuntimeContext().getDistributedCache().getFile()
将文件分发到所有节点,但是由于正在构建图形并且应用程序尚未运行,因此RuntimeContext在此阶段不可用。 - 提供信任库的base64参数表示,并手动将其转换为.jks格式。我需要某种“预初始化”
KafkaSource
挂钩来完成此操作,但在文档中没有找到任何此类功能。 - 使用外部数据存储,比如s3,并从那里检索文件。就我所知,内部Kafka消费者不支持非本地文件系统,所以我仍然需要一些预初始化方法来在每个任务节点上本地检索文件。
在源初始化过程中,使此文件可供任务工作者节点使用的最佳方法是什么?
我以前读过类似的问题贴在这里:
- 如上所述,在应用程序中,此时我无法访问
RuntimeContext
。
- 这将把truststore作为一个base64编码的字符串参数注入。我可以这样做,但是由于内部Kafka消费者需要一个文件,所以在消费者初始化之前,我会遇到将参数转换为.jks格式的问题。我看不到在文档中为
KafkaSource
注册“预初始化”钩子的方法。
1条答案
按热度按时间xfb7svmp1#
更新日期:
我可以通过使用
ssl.truststore.certificates
配置字段来解决这个问题,这允许我提供底层truststore.jks
证书的base64编码表示,而不是本地文件路径。[我还必须将我的
kafka-clients
依赖项更新为2.7.x
+,因为此配置在旧版本的库中不可用]