Kafka:将主题a复制到主题b,同时对记录应用转换

r9f1avp5  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(431)

我需要将记录从集群a上的一个主题镜像到集群b上的一个主题,同时在记录上添加一个字段作为代理(例如。 InsertField ).
我不控制群集a(但可能需要更改),并且完全控制群集b。
我知道集群a正在发送序列化的json。
我正在使用mirrormakerapi和kafka connect来进行镜像,我正在尝试使用 InsertField 转换以在被代理时在记录上添加数据。
我的配置如下:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
topics=.*
source.cluster.alias=upstream
source.cluster.bootstrap.servers=source:9092
target.cluster.bootstrap.servers=target:9092

# ByteArrayConverter to avoid MirrorMaker to re-encode messages

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

transforms=InsertSource1
transforms.InsertSource1.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource1.static.field=test_inser
transforms.InsertSource1.static.value=test_value
name=somerandomname

此代码将失败,并显示一个错误:
org.apache.kafka.connect.errors.dataexception:仅[字段插入]支持结构对象
有没有一种不用编写自定义转换就可以实现的方法(我使用的是python,对java不熟悉)
谢谢

yizd12fk

yizd12fk1#

在当前版本的 Apache Kafka (2.6.0),不能将插入字段单消息转换(smt)应用于 MirrorMaker 2.0 记录。
解释
这个 MirrorMaker 2.0 基于 Kafka Connect 框架,并在内部设置mirrormaker 2.0驱动程序 MirrorSourceConnector .
源连接器在轮询记录后立即应用smt(没有转换器(例如。 ByteArrayConverter 或者 JsonConverter )在这个步骤中:它们在应用smt之后使用)。
这个 SourceRecord 值表示为字节数组 BYTES_SCHEMA 架构。同时 InsertField 转型需要 Type.STRUCT 对于具有架构的记录。
所以,既然记录不能确定为 Struct ,未应用转换。
参考文献
kip-382:mirrormaker 2.0
如何在kafka connect中使用单个消息转换
额外资源
mirrormaker 2.0的docker composePlayground

ccgok5k5

ccgok5k52#

如前所述,字节数组转换器没有结构/模式信息,因此无法使用您正在使用的转换(添加字段)。
但是,这并不意味着不能使用任何变换
如果要发送json消息,则必须发送模式和负载信息。

相关问题