kafka streams无法反序列化值,因此org.apache.avro.generic.genericdata$record不能转换为错误

eimct9ow  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(269)

我试图用avro数据测试kafka流,但它无法反序列化值,org.apache.avro.generic.genericdata$record不能转换为错误。
有什么线索吗?
下面是我的源代码
库版本
斯卡拉=2.11.11
'kafka-streams-scala_2.11',版本:'2.4.0'
mockedstreams 2.11',版本:'3.3.0'

import com.sksamuel.avro4s.AvroSchema
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroDeserializerConfig, KafkaAvroSerializer}
import org.apache.avro.{AvroRuntimeException, Schema}
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes, Serializer}
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._

import scala.collection.JavaConverters._

case class Person(var firstName: String, var lastName: String) extends SpecificRecordBase {

  def this() = this("", "")

  override def get(field: Int): AnyRef = {
    field match {
      case 0 => firstName.asInstanceOf[AnyRef]
      case 1 => lastName.asInstanceOf[AnyRef]
      case _ => new AvroRuntimeException("Bad Index")
    }
  }

  override def put(field: Int, value: Any): Unit = {
    field match {
      case 0 => this.firstName = value.asInstanceOf[String]
      case 1 => this.lastName = value.asInstanceOf[String]
      case _ => new AvroRuntimeException("Bad Index")
    }
  }

  override def getSchema: Schema = AvroSchema[Person]
}

object AvroSerde {

  def avroSerde[T](props: Map[String, AnyRef]): Serde[T] = {

    val avroSerializer = new KafkaAvroSerializer
    avroSerializer.configure(props.asJava, false)

    val avroDeserializer = new KafkaAvroDeserializer
    avroDeserializer.configure(props.asJava, false)

    Serdes.serdeFrom(
      avroSerializer.asInstanceOf[Serializer[T]],
      avroDeserializer.asInstanceOf[Deserializer[T]]
    )
  }

  val avroSpecificSerde: Serde[Person] = {

    val props = Map(
      "schema.registry.url" -> "mock://mock.schema.registry.url",
      KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG   -> true.toString
    )

    avroSerde[Person](props)
  }

}

object PersonTopology {

   def personTopology(builder: StreamsBuilder): Topology = {

    implicit val valueSerDe: Serde[Person] = AvroSerde.avroSpecificSerde

    builder
      .stream[String, Person]("test-person-one")
      .to("test-person-two")

    builder.build()
  }
}

测试代码

import java.util.Collections

import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.scalatest.{FunSuite, Matchers}
class MockedPersonTest extends FunSuite with Matchers {

  val strings = new StringSerde
  val avroSerde = AvroSerde.avroSpecificSerde
  avroSerde.configure(Collections.singletonMap("schema.registry.url", "mock://mock.schema.registry.url"), false)

  val person = Person("x", "y")

  test("simple avro test") {
    val input = List("person-one" -> person)
    val expectedOutPut = List(person)

    MockedStreams()
      .topology(PersonTopology.personTopology)
      .input[String, Person]("test-person-one", strings, avroSerde, input)
      .output[String, Person]("test-person-two", strings, avroSerde, expectedOutPut.size).map(_._2) shouldBe expectedOutPut
  }

}

获取以下错误

[32mMockedPersonTest:[0m
[31m- simple avro test***FAILED***(269 milliseconds)[0m
[31m  java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.mocked.Person[0m
[31m  at com.mocked.MockedPersonTest$$anonfun$1$$anonfun$apply$2.apply(MockedPersonTest.scala:22)[0m
[31m  at com.mocked.MockedPersonTest$$anonfun$1$$anonfun$apply$2.apply(MockedPersonTest.scala:22)[0m
[31m  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)[0m
[31m  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)[0m
[31m  at scala.collection.Iterator$class.foreach(Iterator.scala:891)[0m
[31m  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)[0m
[31m  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)[0m
[31m  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)[0m
[31m  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)[0m
[31m  at scala.collection.AbstractTraversable.map(Traversable.scala:104)[0m

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题