Kafka连接不与主题策略工作

3df52oht  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(382)

上下文
我给几个Kafka连接的小连接器编了代码。一个每秒只生成随机数据,另一个在控制台中记录数据。它们与模式注册表集成,因此数据通过avro序列化。
我使用landoop提供的fastdatadevdocker映像将它们部署到本地kafka环境中
基本设置工作并每秒生成一条记录的消息
但是,我想更改主题名策略。默认的一个生成两个主题:
${topic}-key ${topic}-value 根据我的用例,我将需要生成具有不同模式的事件,这些事件将以相同的主题结束。因此,我需要的主题名称是:
${topic}-${keyRecordName} ${topic}-${valueRecordName} 根据文件,我的需求符合TopicRecordname战略
我试过什么
我创造了 avroData 用于将值发送到连接的对象:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

然后用它来创建 SourceRecord 响应对象
文档指出,为了使用kafka connect中的schema注册表,我必须在connector config中设置一些属性。因此,当我创建它时,我会添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题
连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key 以及 ${topic}-value 学科。
问题
Kafka连接应该支持不同的主题策略。我设法解决了这个问题,写了我自己的版本 AvroConverter 硬编码的主题策略是我需要的。但是,这看起来不是一个好方法,并且在尝试使用sink kafka连接器使用数据时也带来了问题。我复制了这个主题,所以有一个旧名字的版本( ${topic}-key )而且很有效
将主题策略指定给Kafka连接的正确设置是什么?

ezykj2lf

ezykj2lf1#

你错过了比赛 key.converter 以及 value.converter 前缀,用于传递给converter的配置。所以不是:

key.subject.name.strategy
value.subject.name.strategy

你想要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源https://docs.confluent.io/current/connect/managing/configuring.html:
要将配置参数传递给键和值转换器,请在它们前面加上 key.converter. 或者 value.converter. 正如您在定义默认转换器时在工作配置中所做的那样。请注意,这些仅在中指定了相应的转换器配置时使用 key.converter 或者 value.converter 属性。

相关问题