avro模式更新的问题

eagi6jfj  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(548)

我有一个简单的案例课:

case class User(id: String, login: String, key: String)

我正在添加字段“name”

case class User(id: String, login: String, name: String, key: String)

然后在avro模式(user.avsc)中添加此字段

{
  "namespace": "test",
  "type": "record",
  "name": "User",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "login", "type": "string" },
    { "name": "name", "type": "string" },
    { "name": "key", "type": "string" }
  ]
}

此类用于其他案例类:

case class AuthRequest(user: User, session: String)

化学(认证请求.avsc)

{
  "namespace": "test",
  "type": "record",
  "name": "AuthRequest",
  "fields": [
    { "name": "user", "type": "User" },
    { "name": "session", "type": "string" }
  ]
}

在那之后,我的消费者开始抛出例外

Consumer.committableSource(consumerSettings, Subscriptions.topics("token_service_auth_request"))
    .map { msg =>
      Try {
        val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
        val input: AvroBinaryInputStream[AuthRequest] = AvroInputStream.binary[AuthRequest](in)
        val result: AuthRequest = input.iterator.toSeq.head !!!! here is exception
        msg.committableOffset.commitScaladsl()

        (msg.record.value(), result, msg.record.key())
      } match {
        case Success((a: Array[Byte], value: AuthRequest, key: String)) =>
          log.info(s"listener got $msg -> $a -> $value")

          context.parent ! value

        case Failure(e) => e.printStackTrace()
      }
    }
    .runWith(Sink.ignore)

java.util.nosuchelementexception:scala.collection.immutable.stream$empty$.head(stream。scala:1104)在scala.collection.immutable.stream$empty$.head(stream。scala:1102)在test.consumers.authrequestlistener.$anonfun$new$2(authrequestlistener)。scala:39)在scala.util.try$.apply(try。scala:209)在test.consumers.authrequestlistener.$anonfun$新的$1(authrequestlistener。scala:36)在test.consumers.authrequestlistener.$anonfun$new$1$adapted(authrequestlistener。scala:35)在akka.stream.impl.fusing.map$$anon$9.onpush(操作。scala:51)在akka.stream.impl.fusing.graphinterpeter.processpush(graphinterpeter。scala:519)在akka.stream.impl.fusing.graphinterpeter.processevent(graphinterpeter。scala:482)在akka.stream.impl.fusing.graphinterpeter.execute(graphinterpeter。scala:378)在akka.stream.impl.fusing.graphinterpetershell.runbatch(actorgraphinterpreter。scala:588)在akka.stream.impl.fusing.graphinterpretershell$asyncinput.execute(actorgraphinterpreter)。scala:472)在akka.stream.impl.fusing.graphinterperetershell.processevent(actorgraphinterpreter。scala:563)在akka.stream.impl.fusing.actorgraphinterpreter.akka$stream$impl$fusing$actorgraphinterpreter$$processevent(actorgraphinterpreter)。scala:745)在akka.stream.impl.fusing.actorgraphinterpreter$$anonfun$接收$1.applyorelse(actorgraphinterpreter。scala:760)在阿克卡。演员。演员。周围接收(演员。scala:517)在akka.actor.actor.aroundreceive$(actor。scala:515)在akka.stream.impl.fusing.actorgraphinterpreter.aroundreceive(actorgraphinterpreter。scala:670)在akka.actor.actorcell.receivemessage(actorcell。scala:588)在akka.actor.actorcell.invoke(actorcell。scala:557)在akka.dispatch.mailbox.processmailbox(邮箱。scala:258)在akka.dispatch.mailbox.run(邮箱。scala:225)在akka.dispatch.mailbox.exec(邮箱。scala:235)在akka.dispatch.forkjoin.forkjointask.doexec(forkjointask。java:260)在akka.dispatch.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool。java:1339)在akka.dispatch.forkjoin.forkjoinpool.runworker(forkjoinpool。java:1979)在akka.dispatch.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread。java:107)
我试图清理构建和无效缓存-我似乎有一些缓存模式的前一个版本有一些帮助请!

7kjnsjlb

7kjnsjlb1#

您需要使您的更改向后兼容,使新字段可为空并向其添加默认值。

{
  "namespace": "test",
  "type": "record",
  "name": "User",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "login", "type": "string" },
    { "name": "name", "type": ["null", "string"], "default": null },
    { "name": "key", "type": "string" }
  ]
}

相关问题