上下文
我给几个Kafka连接的小连接器编了代码。一个每秒只生成随机数据,另一个在控制台中记录数据。它们与模式注册表集成,因此数据通过avro序列化。
我使用landoop提供的fastdatadevdocker映像将它们部署到本地kafka环境中
基本设置工作并每秒生成一条记录的消息
但是,我想更改主题名策略。默认的一个生成两个主题:
${topic}-key ${topic}-value
根据我的用例,我将需要生成具有不同模式的事件,这些事件将以相同的主题结束。因此,我需要的主题名称是:
${topic}-${keyRecordName} ${topic}-${valueRecordName}
根据文件,我的需求符合TopicRecordname战略
我试过什么
我创造了 avroData
用于将值发送到连接的对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后用它来创建 SourceRecord
响应对象
文档指出,为了使用kafka connect中的schema注册表,我必须在connector config中设置一些属性。因此,当我创建它时,我会添加它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key
以及 ${topic}-value
学科。
问题
Kafka连接应该支持不同的主题策略。我设法解决了这个问题,写了我自己的版本 AvroConverter
硬编码的主题策略是我需要的。但是,这看起来不是一个好方法,并且在尝试使用sink kafka连接器使用数据时也带来了问题。我复制了这个主题,所以有一个旧名字的版本( ${topic}-key
)而且很有效
将主题策略指定给Kafka连接的正确设置是什么?
1条答案
按热度按时间ezykj2lf1#
你错过了比赛
key.converter
以及value.converter
前缀,用于传递给converter的配置。所以不是:你想要:
来源https://docs.confluent.io/current/connect/managing/configuring.html:
要将配置参数传递给键和值转换器,请在它们前面加上
key.converter.
或者value.converter.
正如您在定义默认转换器时在工作配置中所做的那样。请注意,这些仅在中指定了相应的转换器配置时使用key.converter
或者value.converter
属性。