我得到了这个例外
org.apache.kafka.common.protocol.types.SchemaException
Kafka正在重新平衡
具体如下:
使用azure事件中心。使用kafkaapi访问它
“kafka enabled”=是,在azure门户中
使用:compile group:'org.apache.kafka',name:'kafka clients',version:'1.0.2'
使用消费群体
Properties properties = new Properties();
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s.servicebus.windows.net:9093", this.namespace));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MeasurementDeSerializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupName);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
我有两个客户在两台不同的电脑上
当它们都运行时,每个都从可用的32个分区中得到16个分区。
当我关闭其中一个的时候,所有的部分都被重新平衡到另一个。
对于仍在运行的示例,我将得到:
撤销的分区[16、17、18、19、20、21、22、23、24、25、26、27、28、29、30、31]
然后,我将从池循环中得到以下异常:
org.apache.kafka.common.protocol.types.schemaexception:读取字段“leader\u id”时出错:org.apache.kafka.common.protocol.types.schema.read(schema)处的字符串长度-1不能为负。java:76)在org.apache.kafka.common.protocol.apikeys.parseresponse(apikeys。java:279)在org.apache.kafka.clients.networkclient.parsestructmaybeupdatethrottletimemetrics(networkclient。java:586)在org.apache.kafka.clients.networkclient.handlecompletedreceives(networkclient。java:686)在org.apache.kafka.clients.networkclient.poll(networkclient。java:469)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:258)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:230)在org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient。java:190)在org.apache.kafka.clients.consumer.internals.abstractcoordinator.joingroupifneeded(abstractcoordinator)。java:364)在org.apache.kafka.clients.consumer.internals.abstractcoordinator.ensureReactiveGroup(abstractcoordinator.com)上。java:316)在org.apache.kafka.clients.consumer.internals.consumercoordinator.poll(consumercoordinator。java:295)在org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer。java:1146)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer。java:1111)
另一方面,当走另一条路,没有问题
启动第一个示例
示例1获取所有32个分区
启动示例2
重新平衡启动
示例1丢失了16个零件
示例2得到16个部分
知道是什么导致了这个异常吗?
1条答案
按热度按时间a14dhokn1#
对于未来的读者-问题是固定的。https://github.com/azure/azure-event-hubs-for-kafka/issues/41