我需要将记录从集群a上的一个主题镜像到集群b上的一个主题,同时在记录上添加一个字段作为代理(例如。 InsertField
).
我不控制群集a(但可能需要更改),并且完全控制群集b。
我知道集群a正在发送序列化的json。
我正在使用mirrormakerapi和kafka connect来进行镜像,我正在尝试使用 InsertField
转换以在被代理时在记录上添加数据。
我的配置如下:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092
# ByteArrayConverter to avoid MirrorMaker to re-encode messages
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname
此代码将失败,并显示一个错误:
org.apache.kafka.connect.errors.dataexception:仅[字段插入]支持结构对象
有没有一种不用编写自定义转换就可以实现的方法(我使用的是python,对java不熟悉)
谢谢
2条答案
按热度按时间yizd12fk1#
在当前版本的
Apache Kafka
(2.6.0),不能将插入字段单消息转换(smt)应用于MirrorMaker 2.0
记录。解释
这个
MirrorMaker 2.0
基于Kafka Connect
框架,并在内部设置mirrormaker 2.0驱动程序MirrorSourceConnector
.源连接器在轮询记录后立即应用smt(没有转换器(例如。
ByteArrayConverter
或者JsonConverter
)在这个步骤中:它们在应用smt之后使用)。这个
SourceRecord
值表示为字节数组BYTES_SCHEMA
架构。同时InsertField
转型需要Type.STRUCT
对于具有架构的记录。所以,既然记录不能确定为
Struct
,未应用转换。参考文献
kip-382:mirrormaker 2.0
如何在kafka connect中使用单个消息转换
额外资源
mirrormaker 2.0的docker composePlayground
ccgok5k52#
如前所述,字节数组转换器没有结构/模式信息,因此无法使用您正在使用的转换(添加字段)。
但是,这并不意味着不能使用任何变换
如果要发送json消息,则必须发送模式和负载信息。