我在弄清楚如何测试使用avro作为消息格式和(合流)模式注册表的springcloudstreamkafka流应用程序时遇到了困难。
配置可以如下所示:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
笔记:
它正在使用本机序列化/反序列化。
测试框架:JUnit5
我想关于kafka代理,我应该使用一个嵌入式kafkabroker bean,但是正如您所看到的,它还依赖于一个应该以某种方式进行模拟的模式注册表。怎样?
1条答案
按热度按时间woobm2wo1#
解决这个问题确实很痛苦,但最后我还是通过流畅的Kafka流测试成功了:
额外依赖项:
关键是将必要的配置设置为系统属性。为此,我创建了一个单独的测试配置类:
最后是test类,您现在可以在其中生成和使用avro消息,并使用kstream处理它们并利用模拟模式注册表: