我有一个简单的案例课
case class KafkaContainer(key: String, payload: AnyRef)
然后我想通过制作人把这个发给Kafka,我做这个
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))
这很有效
然后我试着吃这个
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)
这对于负载中的任何类都很有效。
但是!如果有任何问题。消费者代码失败
错误:(38,96)找不到com.sksamuel.avro4s.fromrecord[test.messages.kafkancontainer]val输入类型的证据参数的隐式值:avrobinaryinputstream[kafkancontainer]=avroinputstream.BinaryKafkanContainer
错误:(38,96)方法二进制的参数不足:(隐式证据$21:com.sksamuel.avro4s.schemafor[test.messages.kafkacontainer],隐式证据$22:com.sksamuel.avro4s.fromrecord[test.messages.kafkacontainer])com.sksamuel.avro4s.avrobinaryinputstream[test.messages.kafkacontainer]。未指定值参数证据$22。val输入:avrobinaryinputstream[kafkacontainer]=avroinputstream.binarykafkacontainer
如果我宣布它与
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
它无法用
错误:(58,71)找不到com.sksamuel.avro4s.fromvalue[object]implicit val fromrecord类型的惰性隐式值:fromrecord[kafkacontainer]=fromrecord[kafkacontainer]
错误:(58,71)lazyconverter方法的参数不足:(implicit-fromvalue:shapess.lazy[com.sksamuel.avro4s.fromvalue[object]])shapess.lazy[com.sksamuel.avro4s.fromvalue[object]]。未指定的值参数fromvalue。隐式val fromrecord:fromrecord[KafkanContainer]=fromrecord[KafkanContainer]
如果添加每个隐式编译器
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
编译失败并出错
错误:(58,69)宏扩展期间出现异常:java.lang.illegalargumentexception:需求失败:需要case类,但对象不在scala.predef$.require(predef)中。scala:277)在com.sksamuel.avro4s.fromrecord$.applyimpl(fromrecord。scala:283)隐式val fromrecordobject:fromrecord[object]=fromrecord[object]
但是如果我为某个类替换anyref-不需要隐式的,那么一切又正常了
1条答案
按热度按时间ne5o7dgx1#
我在使用any数据类型时遇到了类似的问题。必须指定此成员变量的有效类型,因为any或anyref可以是任何类型。然后使用或shapeless(另请参阅github文档)。在我的例子中,它可以是string、long、double或null,因此使用shapeless可以:
这将转换为avro中的联合类型: