我有一个简单的案例课:
case class User(id: String, login: String, key: String)
我正在添加字段“name”
case class User(id: String, login: String, name: String, key: String)
然后在avro模式(user.avsc)中添加此字段
{
"namespace": "test",
"type": "record",
"name": "User",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "login", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "key", "type": "string" }
]
}
此类用于其他案例类:
case class AuthRequest(user: User, session: String)
化学(认证请求.avsc)
{
"namespace": "test",
"type": "record",
"name": "AuthRequest",
"fields": [
{ "name": "user", "type": "User" },
{ "name": "session", "type": "string" }
]
}
在那之后,我的消费者开始抛出例外
Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
.map { msg =>
Try {
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
msg.committableOffset.commitScaladsl()
(msg.record.value(), result, msg.record.key())
} match {
case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
log.info(s"listener got $msg -> $a -> $value")
context.parent ! value
case Failure(e) => e.printStackTrace()
}
}
.runWith(Sink.ignore)
java.util.nosuchelementexception:scala.collection.immutable.stream$empty$.head(stream。scala:1104)在scala.collection.immutable.stream$empty$.head(stream。scala:1102)在test.consumers.authrequestlistener.$anonfun$new$2(authrequestlistener)。scala:39)在scala.util.try$.apply(try。scala:209)在test.consumers.authrequestlistener.$anonfun$新的$1(authrequestlistener。scala:36)在test.consumers.authrequestlistener.$anonfun$new$1$adapted(authrequestlistener。scala:35)在akka.stream.impl.fusing.map$$anon$9.onpush(操作。scala:51)在akka.stream.impl.fusing.graphinterpeter.processpush(graphinterpeter。scala:519)在akka.stream.impl.fusing.graphinterpeter.processevent(graphinterpeter。scala:482)在akka.stream.impl.fusing.graphinterpeter.execute(graphinterpeter。scala:378)在akka.stream.impl.fusing.graphinterpetershell.runbatch(actorgraphinterpreter。scala:588)在akka.stream.impl.fusing.graphinterpretershell$asyncinput.execute(actorgraphinterpreter)。scala:472)在akka.stream.impl.fusing.graphinterperetershell.processevent(actorgraphinterpreter。scala:563)在akka.stream.impl.fusing.actorgraphinterpreter.akka$stream$impl$fusing$actorgraphinterpreter$$processevent(actorgraphinterpreter)。scala:745)在akka.stream.impl.fusing.actorgraphinterpreter$$anonfun$接收$1.applyorelse(actorgraphinterpreter。scala:760)在阿克卡。演员。演员。周围接收(演员。scala:517)在akka.actor.actor.aroundreceive$(actor。scala:515)在akka.stream.impl.fusing.actorgraphinterpreter.aroundreceive(actorgraphinterpreter。scala:670)在akka.actor.actorcell.receivemessage(actorcell。scala:588)在akka.actor.actorcell.invoke(actorcell。scala:557)在akka.dispatch.mailbox.processmailbox(邮箱。scala:258)在akka.dispatch.mailbox.run(邮箱。scala:225)在akka.dispatch.mailbox.exec(邮箱。scala:235)在akka.dispatch.forkjoin.forkjointask.doexec(forkjointask。java:260)在akka.dispatch.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool。java:1339)在akka.dispatch.forkjoin.forkjoinpool.runworker(forkjoinpool。java:1979)在akka.dispatch.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread。java:107)
我试图清理构建和无效缓存-我似乎有一些缓存模式的前一个版本有一些帮助请!
1条答案
按热度按时间7kjnsjlb1#
您需要使您的更改向后兼容,使新字段可为空并向其添加默认值。