kafka connect是否支持枚举?

myss37ts  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(306)

kafka connect是否支持枚举字段?如果不是,通常的解决方法是什么?我在这里查看kafka 2.6.0 connectschema api:https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/data/connectschema.html
我尝试通过使用合流模式注册表(使用avro)来遵循最佳实践,但似乎无法让我的自定义源连接器生成包含枚举的模式以匹配现有模式(输出主题除了连接器之外还有其他生产者)。解决方法是简单地使用字符串,但这会破坏模式的整体意义,不是吗?

cu6pst1q

cu6pst1q1#

没有一种方法可以在一般情况下工作,但是您可以在avro的情况下使用特定于转换器的特殊配置,然后还必须通过enum字段模式属性提供特殊提示。我能够使用自定义连接转换提供提示。
使用以下配置连接转换器:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://registry:8081",
"value.converter.enhanced.avro.schema.support":  true,
"value.converter.connect.meta.data": false,
"transforms": "alarms",
"transforms.alarms.type": "org.jlab.kafka.connect.transforms.EpicsToAlarm$Value"

然后,自定义转换包含:

final Schema prioritySchema = SchemaBuilder
            .string()
            .doc("Alarm severity organized as a way for operators to prioritize which alarms to take action on first")
            .parameter("io.confluent.connect.avro.enum.doc.AlarmPriority", "Enumeration of possible alarm priorities")
            .parameter("io.confluent.connect.avro.Enum", "org.jlab.kafka.alarms.AlarmPriority")
            .parameter("io.confluent.connect.avro.Enum.1", "P1_LIFE")
            .parameter("io.confluent.connect.avro.Enum.2", "P2_PROPERTY")
            .parameter("io.confluent.connect.avro.Enum.3", "P3_PRODUCTIVITY")
            .parameter("io.confluent.connect.avro.Enum.4", "P4_DIAGNOSTIC")
            .build();

全变换源

相关问题