如何在Spring Cloud Stream Kafka producer、consumer和KStreams中为schema配置value.subject.name.strategy?

e4eetjau  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(98)

我想在Spring Cloud Stream Producers、Consumers和KStreams中自定义Avro模式主题的命名策略。
这将在Kafka中使用属性key.subject.name.strategyvalue.subject.name.strategy-> https://docs.confluent.io/current/schema-registry/serializer-formatter.html#subject-name-strategy完成
在一个本地的Kafka Producer中,这是可行的:

private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }

但是我在Spring Cloud溪里找不到这个方法。到目前为止,我已经在一个生产者中尝试过了:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

显然,Spring Cloud使用了自己的主题命名策略,接口为org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy,只有一个子类:DefaultSubjectNamingStrategy
是否有声明式的方式来配置value.subject.name.strategy,或者我们是否需要提供自己的org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy实现和spring.cloud.stream.schema.avro.subject-naming-strategy属性?

7kqas0il

7kqas0il1#

正如在另一个答案中指出的那样,有一个专用的属性spring.cloud.stream.schema.avro.subjectNamingStrategy,它允许为Kafka生产者设置不同的命名策略**。
我贡献了org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy,它提供了开箱即用的功能。
Kafka Streams和本机序列化/非序列化(Spring Cloud Streams 3.0.0+的默认行为)的情况下,您必须使用Confluent的实现(io.confluent.kafka.serializers.subject.RecordNameStrategy)和本机属性:

spring:
  application:
    name: shipping-service
  cloud:
    stream:
      ...
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              ...
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
wgxvkvu9

wgxvkvu92#

您可以在属性中将其声明为

spring.cloud.stream.schema.avro.subjectNamingStrategy=MyStrategy

其中MyStrategy是接口的实现。例如

object MyStrategy: SubjectNamingStrategy {
   override fun toSubject(schema: Schema): String = schema.fullName
}

相关问题