apacheavro在java中序列化记录&使用kafka在scala中反序列化

d6kp6zgx  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(258)

我有一个生产者使用java11/spring和apacheavro,另一个消费者使用scala2.12和akka。如果我没说错的话,avro的有效负载应该是语言不可知的。但是,当我试图在scala consumer中获取consumerrecord的值时,我得到了一个classcastexception:

java.lang.ClassCastException: class com.example.schema.DetailsSchema cannot be cast to class com.example.schema.scala.DetailsSchema (com.example.schema.DetailsSchema and com.example.schema.scala.DetailsSchema are in unnamed module of loader 'app'

我正在使用几个插件从我的模式中生成java和scala类
“com.commercehub.gradle.plugin.avro”版本“0.9.1”
“com.zlad.gradle.avrohugger”版本“0.5.0”
对于scala类,我只是在名称空间的末尾附加一个.scala包,以避免与java生成的类发生冲突。
avroschema和java/scala类位于使用上述插件的公共jar中。java类是使用com.example.schema包生成的,scala类使用com.example.schema.scala,两者都扩展org.apache.avro.specific.specificrecordbase
我的producer是一个使用springkafka&confluent的kafkaavro序列化程序的标准spring应用程序
我的消费者是akka应用程序,使用akka stream kafka消费代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer
      import org.apache.kafka.common.serialization._

      val kafkaAvroSerDeConfig = Map[String, Any](
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081",
        KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
      )

      val kafkaConsumerSettings: ConsumerSettings[String, DetailsChema] = {
        val kafkaAvroDeserializer = new KafkaAvroDeserializer()
        kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
        val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[DetailsChema]]

        ConsumerSettings(system, new StringDeserializer, deserializer)
          .withBootstrapServers("localhost:9092")
          .withGroupId("xt-collector")
          .withProperties((ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"),
            (ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"),
            (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
      }

      val control = Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics("details"))
        .map(msg => {
          try {
            val details = msg.value() // HERE I get the class cast exception
            // java.lang.ClassCastException: class com.example.schema.DetailsSchema cannot be cast to class com.example.schema.scala.DetailsSchema (com.example.schema.DetailsSchema and com.example.schema.scala.DetailsSchema are in unnamed module of loader 'app'
            log.info(s"Received ${details._id} to sink into mongo/elastic")
            details
          } catch {
            case e: Exception => log.error("Error while deserializing details", e)
              e.printStackTrace()
          }
        })
    //    .via(detailsCollection.flow) Just sink into mongo/elastic
        .recover {
          case t : Throwable => log.error("Error while storing in mongo/elastic", t)
        }
        .toMat(Sink.seq)(DrainingControl.apply)
        .run()

我的问题是:如果两个avro记录都是从同一个schema生成的,那么是否有可能在java中生成一个avro记录并在另一端使用scala case类进行反序列化?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题