我试图用avro数据测试kafka流,但它无法反序列化值,org.apache.avro.generic.genericdata$record不能转换为错误。
有什么线索吗?
下面是我的源代码
库版本
斯卡拉=2.11.11
'kafka-streams-scala_2.11',版本:'2.4.0'
mockedstreams 2.11',版本:'3.3.0'
import com.sksamuel.avro4s.AvroSchema
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroDeserializerConfig, KafkaAvroSerializer}
import org.apache.avro.{AvroRuntimeException, Schema}
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes, Serializer}
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import scala.collection.JavaConverters._
case class Person(var firstName: String, var lastName: String) extends SpecificRecordBase {
def this() = this("", "")
override def get(field: Int): AnyRef = {
field match {
case 0 => firstName.asInstanceOf[AnyRef]
case 1 => lastName.asInstanceOf[AnyRef]
case _ => new AvroRuntimeException("Bad Index")
}
}
override def put(field: Int, value: Any): Unit = {
field match {
case 0 => this.firstName = value.asInstanceOf[String]
case 1 => this.lastName = value.asInstanceOf[String]
case _ => new AvroRuntimeException("Bad Index")
}
}
override def getSchema: Schema = AvroSchema[Person]
}
object AvroSerde {
def avroSerde[T](props: Map[String, AnyRef]): Serde[T] = {
val avroSerializer = new KafkaAvroSerializer
avroSerializer.configure(props.asJava, false)
val avroDeserializer = new KafkaAvroDeserializer
avroDeserializer.configure(props.asJava, false)
Serdes.serdeFrom(
avroSerializer.asInstanceOf[Serializer[T]],
avroDeserializer.asInstanceOf[Deserializer[T]]
)
}
val avroSpecificSerde: Serde[Person] = {
val props = Map(
"schema.registry.url" -> "mock://mock.schema.registry.url",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
)
avroSerde[Person](props)
}
}
object PersonTopology {
def personTopology(builder: StreamsBuilder): Topology = {
implicit val valueSerDe: Serde[Person] = AvroSerde.avroSpecificSerde
builder
.stream[String, Person]("test-person-one")
.to("test-person-two")
builder.build()
}
}
测试代码
import java.util.Collections
import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.scalatest.{FunSuite, Matchers}
class MockedPersonTest extends FunSuite with Matchers {
val strings = new StringSerde
val avroSerde = AvroSerde.avroSpecificSerde
avroSerde.configure(Collections.singletonMap("schema.registry.url", "mock://mock.schema.registry.url"), false)
val person = Person("x", "y")
test("simple avro test") {
val input = List("person-one" -> person)
val expectedOutPut = List(person)
MockedStreams()
.topology(PersonTopology.personTopology)
.input[String, Person]("test-person-one", strings, avroSerde, input)
.output[String, Person]("test-person-two", strings, avroSerde, expectedOutPut.size).map(_._2) shouldBe expectedOutPut
}
}
获取以下错误
[32mMockedPersonTest:[0m
[31m- simple avro test***FAILED***(269 milliseconds)[0m
[31m java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.mocked.Person[0m
[31m at com.mocked.MockedPersonTest$$anonfun$1$$anonfun$apply$2.apply(MockedPersonTest.scala:22)[0m
[31m at com.mocked.MockedPersonTest$$anonfun$1$$anonfun$apply$2.apply(MockedPersonTest.scala:22)[0m
[31m at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)[0m
[31m at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)[0m
[31m at scala.collection.Iterator$class.foreach(Iterator.scala:891)[0m
[31m at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)[0m
[31m at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)[0m
[31m at scala.collection.AbstractIterable.foreach(Iterable.scala:54)[0m
[31m at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)[0m
[31m at scala.collection.AbstractTraversable.map(Traversable.scala:104)[0m
暂无答案!
目前还没有任何答案,快来回答吧!