如何在spring云流应用程序中反序列化messagepack

hgb9j2n6  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(377)

我正在尝试使用springcloudstream创建一个kafka流应用程序,但是正在努力反序列化输入消息,这些消息的值已经使用messagepack进行了编码。
到目前为止我得到的是:

// TransactionApplication.java

@SpringBootApplication
public class TransactionApplication {

  public static void main(String[] args) {
    SpringApplication.run(TransactionApplication.class, args);
  }

  public static class TransactionConsumer {

    @Bean
    public Serde<Transaction> transactionSerde() {
      ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
      return new JsonSerde<Transaction>(mapper);
    }

    @Bean
    public Consumer<KStream<String, Transaction>> process() {
      return input -> input.foreach((key, value) -> {
        System.out.println("Key: " + key + " Value: " + value);
      });
    }
  }
}
// Transaction.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
  String item;
  Number amount;
}

我得到一个错误:
illegalstateexception:头中没有类型信息,也没有提供默认类型。
我的申请是:

spring.cloud.stream:
  bindings:
    process-in-0:
      destination: transactions
  kafka:
    streams:
      binder:
        applicationId: transactions-application
        configuration:
          commit.interval.ms: 100

包括之后 spring.json.value.default.type: com.example.Transaction 在下面 configuration 我的应用程序的节点。yml,我得到另一个错误。见下文。

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
 at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
rur96b6h

rur96b6h1#

您需要向反序列化程序提供一个提示,告诉它要从编码的负载创建什么对象。
如果记录是由springjsonserializer创建的,那么提示就在头中。
如果没有,则必须在streams配置中提供提示。
你需要展示你的能力 application.yml/properties .

相关问题