我使用Spring Cloud Stream和Kafka、avro、schema registry。我在函数风格下进行React式编程。我想生成这样的消息。GenericMessage [payload={"id": "efb90cd6-e022-4d82-9898-6b78114cfb01", "type": "FirstRankPaymentAgreed",...}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]
但它产生的消息是这样的。GenericMessage [payload=byte[2151], headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]
spring-cloud-stream-schema 2.2.1.RELEASE
spring-cloud-stream 3.2.2
spring-cloud-starter-stream-kafka 3.2.5
spring-cloud-stream-binder-kafka-streams 3.2.5
spring-cloud-function-context 3.2.1
kafka-avro-serializer 5.3.0
spring-kafka 2.9.0
org.apache.avro.avro 1.11.1
我使用的是React式功能文体。
Function<Flux<Message<Object>>, Flux<Message>> handler()
return Mono.just(Message<FirstRankPaymentAgreed> messageMessageBuilder = MessageBuilder.withPayload((FirstRankPaymentAgreed) message.getPayload()).build());
此生成器的结果是收到的消息:GenericMessage [payload=byte[1056], headers={contentType=application/json, id=7d3b65c1-11d8-0fb2-a277-0603f58fd516, timestamp=1672174971194}]
在有效载荷中,我们有字节数组,而不是JSON。
我想要类似于GenericMessage [payload={"id": "254335d0-b631-454e-98de-d2d5129af4c0", "type": "ObjectClass", "delta"...
东西
cloud:
stream:
function:
definition: dispatchConsumerFromTempoComposerEvent
bindings:
dispatchConsumerFromTempoComposerEvent-in-0:
destination: tempo-composer-event
dispatchConsumerFromTempoComposerEvent-out-0:
destination: tempo-composer-event
contentType: application/json --> i try to add this
kafka:
binder:
auto-create-topics: false
consumer-properties:
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
producer-properties:
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy --> i try to add this
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
我想将项目迁移到Spring Cloud Stream。“遗留”代码:
一个一个三个一个一个一个一个一个四个一个一个一个一个一个五个一个
此遗留代码生成
GenericMessage [payload={"id": "efb90cd6-e022-4d82-9898-6b78114cfb01", "type": "FirstRankPaymentAgreed",...}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]
负载是JSON格式的。在我迁移spring云流之后,我有了[payload=byte[2151]...
1条答案
按热度按时间vwkv1x7d1#
我只是简单地使用模式注册表。而是使用消息有效负载检索类和获取模式,以将字节数组转换为最终的avro模式。我使用模式注册表和ID。忽略字节并获取模式ID,以找到正确的模式。您可以只注解application.class并添加@EnableSchemaRegistry。您的签名函数如下所示。
读取字节数组,获取schema的id,在schema注册表中找到正确的schema,将字节数组转换为正确的类。