Spring的云流Kafka,想要发送消息< Object>但是Spring发送消息〈Byte[]>,有效载荷是Byte[]而不是JSON格式的GenericMessage

k0pti3hp  于 2023-01-12  发布在  Apache
关注(0)|答案(1)|浏览(130)

我使用Spring Cloud Stream和Kafka、avro、schema registry。我在函数风格下进行React式编程。我想生成这样的消息。GenericMessage [payload={"id": "efb90cd6-e022-4d82-9898-6b78114cfb01", "type": "FirstRankPaymentAgreed",...}, headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]
但它产生的消息是这样的。GenericMessage [payload=byte[2151], headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]

spring-cloud-stream-schema 2.2.1.RELEASE
spring-cloud-stream 3.2.2
spring-cloud-starter-stream-kafka 3.2.5
spring-cloud-stream-binder-kafka-streams 3.2.5
spring-cloud-function-context 3.2.1
kafka-avro-serializer 5.3.0
spring-kafka 2.9.0
org.apache.avro.avro 1.11.1

我使用的是React式功能文体。

Function<Flux<Message<Object>>, Flux<Message>> handler()

return Mono.just(Message<FirstRankPaymentAgreed> messageMessageBuilder = MessageBuilder.withPayload((FirstRankPaymentAgreed) message.getPayload()).build());

此生成器的结果是收到的消息:GenericMessage [payload=byte[1056], headers={contentType=application/json, id=7d3b65c1-11d8-0fb2-a277-0603f58fd516, timestamp=1672174971194}]
在有效载荷中,我们有字节数组,而不是JSON。
我想要类似于GenericMessage [payload={"id": "254335d0-b631-454e-98de-d2d5129af4c0", "type": "ObjectClass", "delta"...东西

cloud:
  stream:
    function:
      definition: dispatchConsumerFromTempoComposerEvent
    bindings:
      dispatchConsumerFromTempoComposerEvent-in-0:
        destination: tempo-composer-event
      dispatchConsumerFromTempoComposerEvent-out-0:
        destination: tempo-composer-event
        contentType: application/json  --> i try to add this 
    kafka:
      binder:
        auto-create-topics: false
        consumer-properties:
          value:
            subject:
              name:
                strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
          schema.registry.url: http://localhost:8081
          specific.avro.reader: true
        producer-properties:
          value:
            subject:
              name:
                strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy --> i try to add this
          key.serializer: org.apache.kafka.common.serialization.StringSerializer
          value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
          schema.registry.url: http://localhost:8081

我想将项目迁移到Spring Cloud Stream。“遗留”代码:
一个一个三个一个一个一个一个一个四个一个一个一个一个一个五个一个
此遗留代码生成

GenericMessage [payload={"id": "efb90cd6-e022-4d82-9898-6b78114cfb01", "type": "FirstRankPaymentAgreed",...},  headers={deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=cucumber-test1, contentType=application/json...}]

负载是JSON格式的。在我迁移spring云流之后,我有了[payload=byte[2151]...

vwkv1x7d

vwkv1x7d1#

我只是简单地使用模式注册表。而是使用消息有效负载检索类和获取模式,以将字节数组转换为最终的avro模式。我使用模式注册表和ID。忽略字节并获取模式ID,以找到正确的模式。您可以只注解application.class并添加@EnableSchemaRegistry。您的签名函数如下所示。

@Bean
public Function<Flux<Message<Object>>, Flux<Message<Object>>>

读取字节数组,获取schema的id,在schema注册表中找到正确的schema,将字节数组转换为正确的类。

相关问题