配置kafka streams应用程序以使用合流模式注册表

llmtgqce  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(211)

我有一个kafka streams java应用程序,它处理一个avro-kafka主题,对数据进行一些更改,并将avro中处理的结果放入另一个主题。
为此,我使用了avro genericord serde,这样我就可以反序列化每个记录,修改所需的字段,然后将其再次序列化为avro。
这是serde声明,配置为使用schema注册表http://localhost:8082:

Map<String, String> serdeConfig;
serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8082");
Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true);
valueGenericAvroSerde.configure(serdeConfig, false);

使用serde配置的abobe将结果放入目标主题:

//Process Stream
ObfuscateTopicProcessor obfuscateTopicProcessor = new ObfuscateTopicProcessor(environment, appName);
StreamsBuilder builder = new StreamsBuilder();
obfuscateTopicProcessor.convert(builder, topics, fieldList)
        .to((k, v, r) -> r.topic() + topicSufix, Produced.with(keyGenericAvroSerde, valueGenericAvroSerde));
//Run Stream
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);

问题是:当我运行应用程序时,它工作得很好,但是它会在schema注册表中生成新的已处理主题模式http://localhost:8081.
这是应用程序的日志,它应该在中创建新的主题模式http://localhost:8082:

[2019-12-10 10:46:51,190] [INFO] KafkaAvroSerializerConfig values:
    bearer.auth.token = [hidden]
    schema.registry.url = [http://localhost:8082]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy  (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)

我是否缺少任何重要的配置参数?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题