avro4s无法反序列化anyref

uttx8gqw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(328)

我有一个简单的案例课

case class KafkaContainer(key: String, payload: AnyRef)

然后我想通过制作人把这个发给Kafka,我做这个

val byteArrayStream = new ByteArrayOutputStream()
      val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
      output.write(msg)
      output.close()
      val bytes = byteArrayStream.toByteArray
      producer.send(new ProducerRecord("my_topic", msg.key, bytes))

这很有效
然后我试着吃这个

Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
    .map { msg =>
      val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
      val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
      val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
      input.close()
        ...
    }.runWith(Sink.ignore)

这对于负载中的任何类都很有效。
但是!如果有任何问题。消费者代码失败
错误:(38,96)找不到com.sksamuel.avro4s.fromrecord[test.messages.kafkancontainer]val输入类型的证据参数的隐式值:avrobinaryinputstream[kafkancontainer]=avroinputstream.BinaryKafkanContainer
错误:(38,96)方法二进制的参数不足:(隐式证据$21:com.sksamuel.avro4s.schemafor[test.messages.kafkacontainer],隐式证据$22:com.sksamuel.avro4s.fromrecord[test.messages.kafkacontainer])com.sksamuel.avro4s.avrobinaryinputstream[test.messages.kafkacontainer]。未指定值参数证据$22。val输入:avrobinaryinputstream[kafkacontainer]=avroinputstream.binarykafkacontainer
如果我宣布它与

implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

它无法用
错误:(58,71)找不到com.sksamuel.avro4s.fromvalue[object]implicit val fromrecord类型的惰性隐式值:fromrecord[kafkacontainer]=fromrecord[kafkacontainer]
错误:(58,71)lazyconverter方法的参数不足:(implicit-fromvalue:shapess.lazy[com.sksamuel.avro4s.fromvalue[object]])shapess.lazy[com.sksamuel.avro4s.fromvalue[object]]。未指定的值参数fromvalue。隐式val fromrecord:fromrecord[KafkanContainer]=fromrecord[KafkanContainer]
如果添加每个隐式编译器

lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

编译失败并出错
错误:(58,69)宏扩展期间出现异常:java.lang.illegalargumentexception:需求失败:需要case类,但对象不在scala.predef$.require(predef)中。scala:277)在com.sksamuel.avro4s.fromrecord$.applyimpl(fromrecord。scala:283)隐式val fromrecordobject:fromrecord[object]=fromrecord[object]
但是如果我为某个类替换anyref-不需要隐式的,那么一切又正常了

ne5o7dgx

ne5o7dgx1#

我在使用any数据类型时遇到了类似的问题。必须指定此成员变量的有效类型,因为any或anyref可以是任何类型。然后使用或shapeless(另请参阅github文档)。在我的例子中,它可以是string、long、double或null,因此使用shapeless可以:

case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])

这将转换为avro中的联合类型:

{
    "name" : "value",
    "type" : [ "null", "string", "long", "double" ]
}

相关问题