schemaexception:读取kafka和事件中心中的字段“topics”时出错

jtjikinw  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(224)

我正在与azure事件中心和Kafka合作。我有一个生产者,将发送消息到一个主题与4个分区在事件中心。然后,4个消费者将从该主题中消费。大多数时候,它工作正常。但是,我有时会遇到这样的错误:

org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': Error reading array of size XXXXXXX, only XX bytes available

有谁能告诉我为什么会发生这种情况以及如何解决这个问题的背景吗?
以下是引发错误的部分日志:

19/07/12 15:22:20.354 ERROR reactor.kafka.receiver.internals.DefaultKafkaReceiver: Unexpected exception
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': Error reading array of size 825844834, only 33 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76)
at org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeSubscription(ConsumerProtocol.java:96)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver$PollEvent.run(DefaultKafkaReceiver.java:446)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:377)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$16(DefaultKafkaReceiver.java:319)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:396)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:480)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题