我有一个kafka streams java应用程序,它处理一个avro-kafka主题,对数据进行一些更改,并将avro中处理的结果放入另一个主题。
为此,我使用了avro genericord serde,这样我就可以反序列化每个记录,修改所需的字段,然后将其再次序列化为avro。
这是serde声明,配置为使用schema注册表http://localhost:8082:
Map<String, String> serdeConfig;
serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8082");
Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true);
valueGenericAvroSerde.configure(serdeConfig, false);
使用serde配置的abobe将结果放入目标主题:
//Process Stream
ObfuscateTopicProcessor obfuscateTopicProcessor = new ObfuscateTopicProcessor(environment, appName);
StreamsBuilder builder = new StreamsBuilder();
obfuscateTopicProcessor.convert(builder, topics, fieldList)
.to((k, v, r) -> r.topic() + topicSufix, Produced.with(keyGenericAvroSerde, valueGenericAvroSerde));
//Run Stream
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
问题是:当我运行应用程序时,它工作得很好,但是它会在schema注册表中生成新的已处理主题模式http://localhost:8081.
这是应用程序的日志,它应该在中创建新的主题模式http://localhost:8082:
[2019-12-10 10:46:51,190] [INFO] KafkaAvroSerializerConfig values:
bearer.auth.token = [hidden]
schema.registry.url = [http://localhost:8082]
basic.auth.user.info = [hidden]
auto.register.schemas = true
max.schemas.per.subject = 1000
basic.auth.credentials.source = URL
schema.registry.basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.kafka.serializers.KafkaAvroSerializerConfig)
我是否缺少任何重要的配置参数?
暂无答案!
目前还没有任何答案,快来回答吧!