我正在尝试使用zeebe kafka连接器将JSON发送到Kafka中。Zeebe改变了JSON的格式。它将冒号(:)替换为equal(=),将双引号(“)替换为引号(')。例如:
{ "id": "1234"}
变成
{ 'id'='1234'}
但我不希望这种转变 www.example.com 能解决这个问题吗?
btxsgosb1#
我认为JSON有效负载似乎在发送到Kafka之前被ZeebeKafka连接器更改。双引号(“)更改为单引号('),冒号(:)替换为等号(=)。连接器的默认序列化设置可能是造成这种情况的原因。因此,您需要调整Kafka连接器的序列化设置,以阻止这种转换发生。您可以通过使用Kafka生产者配置的值来实现这一点。序列化器参数使用不改变JSON有效负载的序列化器。例如,org.apache.kafka.common.serialization.StringSerializer类可以用作值序列化器。JSON有效负载仅由此序列化器转换为字符串,而无需任何更改。可以将以下属性添加到Kafka生产者配置中以设置此序列化器:
org.apache.kafka.common.serialization.StringSerializer
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
利用一个唯一的序列化器来保持原始的JSON格式是另一种选择。实现org.apache.kafka.common.serialization.Serializer接口并重写serialize方法,以不做任何更改的方式将JSON有效负载作为字节数组返回,这将允许您设计自定义序列化器。定制序列化器完成后,您可以配置Kafka生产者将其用作值序列化器:
org.apache.kafka.common.serialization.Serializer
properties.put("value.serializer", "com.example.CustomJsonSerializer");
如果你想阻止它改变JSON负载,你必须改变连接器使用的Kafka生产者的序列化选项。使用一个自定义序列化器来保持原始的JSON格式,或者使用org.apache.kafka.common.serialization.StringSerializer作为值序列化器。
1条答案
按热度按时间btxsgosb1#
我认为JSON有效负载似乎在发送到Kafka之前被ZeebeKafka连接器更改。双引号(“)更改为单引号('),冒号(:)替换为等号(=)。连接器的默认序列化设置可能是造成这种情况的原因。
因此,您需要调整Kafka连接器的序列化设置,以阻止这种转换发生。您可以通过使用Kafka生产者配置的值来实现这一点。序列化器参数使用不改变JSON有效负载的序列化器。
例如,
org.apache.kafka.common.serialization.StringSerializer
类可以用作值序列化器。JSON有效负载仅由此序列化器转换为字符串,而无需任何更改。可以将以下属性添加到Kafka生产者配置中以设置此序列化器:利用一个唯一的序列化器来保持原始的JSON格式是另一种选择。实现
org.apache.kafka.common.serialization.Serializer
接口并重写serialize方法,以不做任何更改的方式将JSON有效负载作为字节数组返回,这将允许您设计自定义序列化器。定制序列化器完成后,您可以配置Kafka生产者将其用作值序列化器:
如果你想阻止它改变JSON负载,你必须改变连接器使用的Kafka生产者的序列化选项。使用一个自定义序列化器来保持原始的JSON格式,或者使用
org.apache.kafka.common.serialization.StringSerializer
作为值序列化器。