我正在尝试使用springcloudstream创建一个kafka流应用程序,但是正在努力反序列化输入消息,这些消息的值已经使用messagepack进行了编码。
到目前为止我得到的是:
// TransactionApplication.java
@SpringBootApplication
public class TransactionApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionApplication.class, args);
}
public static class TransactionConsumer {
@Bean
public Serde<Transaction> transactionSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
return new JsonSerde<Transaction>(mapper);
}
@Bean
public Consumer<KStream<String, Transaction>> process() {
return input -> input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
}
// Transaction.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
String item;
Number amount;
}
我得到一个错误:
illegalstateexception:头中没有类型信息,也没有提供默认类型。
我的申请是:
spring.cloud.stream:
bindings:
process-in-0:
destination: transactions
kafka:
streams:
binder:
applicationId: transactions-application
configuration:
commit.interval.ms: 100
包括之后 spring.json.value.default.type: com.example.Transaction
在下面 configuration
我的应用程序的节点。yml,我得到另一个错误。见下文。
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
1条答案
按热度按时间rur96b6h1#
您需要向反序列化程序提供一个提示,告诉它要从编码的负载创建什么对象。
如果记录是由springjsonserializer创建的,那么提示就在头中。
如果没有,则必须在streams配置中提供提示。
你需要展示你的能力
application.yml/properties
.