kafka avro-从long到datetime的转换

9nvpjoqh  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(333)

当我想发送一个包含long类型字段的avro消息时,出现以下错误:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime

我使用Confluent3.2.0和ApacheSpark2.2.0。这个错误在spark作业中抛出,spark作业处理avro消息并在控制台中打印它们。在avro模式中,相应的字段定义如下:

{\"name\": \"event_time\", \"type\": { \"type\" : \"long\", \"logicalType\": \"timestamp-millis\"}}

在由 .avsc 文件中,该字段定义如下:

private DateTime event_time;
cgfeq70w

cgfeq70w1#

我在使用Confluent4.0.0和AVRO1.8.2时遇到了类似的问题。我有一个流处理器,试图将long转换为datetime。我通过添加正确的转换克服了这个问题。在开始任何处理逻辑之前,我使用了特定的数据静态实用程序类并添加了正确的逻辑类型转换。

SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
1sbrub3j

1sbrub3j2#

在avro 1.9.x及更高版本中,符号 TimestampConversion 已经不存在了。将@user3222582中的上述内容替换为 TimestampMillisConversion 修正了avro1.9.2的编译错误

kgqe7b3p

kgqe7b3p3#

尝试将avro dependency中的avro版本升级到1.9.x,pom.xml中的avro maven插件升级到1.9.x

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.9.1</version>
</dependency>

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.9.1</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
                <!--<goal>idl-protocol</goal>-->
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
                <enableDecimalLogicalType>true</enableDecimalLogicalType>
                <stringType>String</stringType>
            </configuration>
        </execution>
    </executions>
</plugin>

还要确保从avro模式中删除先前生成的类并进行mvn编译。

相关问题