java 如何使用zeebe kafka连接器将json发送到Kafka?

rdlzhqv9  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(127)

我正在尝试使用zeebe kafka连接器将JSON发送到Kafka中。Zeebe改变了JSON的格式。它将冒号(:)替换为equal(=),将双引号(“)替换为引号(')。
例如:

{ "id": "1234"}

变成

{ 'id'='1234'}

但我不希望这种转变 www.example.com 能解决这个问题吗?

btxsgosb

btxsgosb1#

我认为JSON有效负载似乎在发送到Kafka之前被ZeebeKafka连接器更改。双引号(“)更改为单引号('),冒号(:)替换为等号(=)。连接器的默认序列化设置可能是造成这种情况的原因。
因此,您需要调整Kafka连接器的序列化设置,以阻止这种转换发生。您可以通过使用Kafka生产者配置的值来实现这一点。序列化器参数使用不改变JSON有效负载的序列化器。
例如,org.apache.kafka.common.serialization.StringSerializer类可以用作值序列化器。JSON有效负载仅由此序列化器转换为字符串,而无需任何更改。可以将以下属性添加到Kafka生产者配置中以设置此序列化器:

properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

利用一个唯一的序列化器来保持原始的JSON格式是另一种选择。实现org.apache.kafka.common.serialization.Serializer接口并重写serialize方法,以不做任何更改的方式将JSON有效负载作为字节数组返回,这将允许您设计自定义序列化器。
定制序列化器完成后,您可以配置Kafka生产者将其用作值序列化器:

properties.put("value.serializer", "com.example.CustomJsonSerializer");

如果你想阻止它改变JSON负载,你必须改变连接器使用的Kafka生产者的序列化选项。使用一个自定义序列化器来保持原始的JSON格式,或者使用org.apache.kafka.common.serialization.StringSerializer作为值序列化器。

相关问题