嗨,我正试图转换我从Kafka通用记录到具体的对象,我想用下来的行。
这是我的密码
public void listen(ConsumerRecord<String, GenericRecord> consumerRecord) {
TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());
从“长”到“即时”的转换不起作用。我得到这个例外
at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]
导致此问题的字段具有此架构
{"name":"occurred_at","type":{"type":"long","logicalType":"timestamp-millis"}
我已经能够调试到avro本机代码试图转换值的地方,问题是它基于字段的类来分配转换器。因为类是java.lang.long,而转换器的类是java.time.instant,所以没有匹配。有什么我不知道的吗?我想通过实现我自己的转换器来解决这个问题,但是那需要是java.lang.long类型,这没有任何意义+我不能将值转换为java.time.instant(没有接口方法返回泛型对象)
1条答案
按热度按时间kd3sttzy1#
我找到了一种使用jackson对象Map器的方法