将avro genericord转换为specificdata对象,同时将long转换为instant

btqmn9zl  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(526)

嗨,我正试图转换我从Kafka通用记录到具体的对象,我想用下来的行。
这是我的密码

public void listen(ConsumerRecord<String, GenericRecord> consumerRecord) {
        TxnEngineEvent event = (TxnEngineEvent) SpecificData.get().deepCopy(TxnEngineEvent.SCHEMA$, consumerRecord.value());

从“长”到“即时”的转换不起作用。我得到这个例外

at org.springframework.kafka.listener.ContainerStoppingErrorHandler.handle(ContainerStoppingErrorHandler.java:65) ~[spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1776) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1693) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1619) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1522) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1263) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1009) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_212]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.billpay.klarnabankingservice.consumer.kafka.KafkaSepaIncomingConsumer.listen(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, org.apache.avro.generic.GenericRecord>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant; nested exception is java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1788) [spring-kafka-2.4.5.RELEASE.jar:2.4.5.RELEASE]
    ... 10 common frames omitted
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.time.Instant
    at com.klarna.messaging.Metadata.put(Metadata.java:204) ~[classes/:na]

导致此问题的字段具有此架构

{"name":"occurred_at","type":{"type":"long","logicalType":"timestamp-millis"}

我已经能够调试到avro本机代码试图转换值的地方,问题是它基于字段的类来分配转换器。因为类是java.lang.long,而转换器的类是java.time.instant,所以没有匹配。有什么我不知道的吗?我想通过实现我自己的转换器来解决这个问题,但是那需要是java.lang.long类型,这没有任何意义+我不能将值转换为java.time.instant(没有接口方法返回泛型对象)

kd3sttzy

kd3sttzy1#

我找到了一种使用jackson对象Map器的方法

ConsumerRecord<Key, Value> singleRecord = records.poll(2000, TimeUnit.MILLISECONDS);

ObjectMapper objectMapper = new ObjectMapper()
  .registerModule(new ParameterNamesModule())
  .registerModule(new Jdk8Module())
  .registerModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

Value value = objectMapper.readValue(String.valueOf(singleRecord.value()), Value.class);

相关问题