我想在Spring Cloud Stream Producers、Consumers和KStreams中自定义Avro模式主题的命名策略。
这将在Kafka中使用属性key.subject.name.strategy
和value.subject.name.strategy
-> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy完成
在一个本地的Kafka Producer中,这是可行的:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
但是我在Spring Cloud溪里找不到这个方法。到目前为止,我已经在一个生产者中尝试过了:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
显然,Spring Cloud使用了自己的主题命名策略,接口为org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
,只有一个子类:DefaultSubjectNamingStrategy
。
是否有声明式的方式来配置value.subject.name.strategy
,或者我们是否需要提供自己的org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
实现和spring.cloud.stream.schema.avro.subject-naming-strategy
属性?
2条答案
按热度按时间7kqas0il1#
正如在另一个答案中指出的那样,有一个专用的属性
spring.cloud.stream.schema.avro.subjectNamingStrategy
,它允许为Kafka生产者设置不同的命名策略**。我贡献了
org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
,它提供了开箱即用的功能。在Kafka Streams和本机序列化/非序列化(Spring Cloud Streams 3.0.0+的默认行为)的情况下,您必须使用Confluent的实现(
io.confluent.kafka.serializers.subject.RecordNameStrategy
)和本机属性:wgxvkvu92#
您可以在属性中将其声明为
其中MyStrategy是接口的实现。例如