我有一个简单的javaKafka消费者,代码如下
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()&& !done){
try {
System.out.println("Parsing data");
byte[] data = it.next().message();
System.out.println("Found data: "+data);
values.add(data); // array list
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
done = true;
}
当消息被发布时,数据被成功读取,但是当它返回到checking it.hasnext()时,它将保持挂起状态,并且永远不会返回。
有什么能拖延这件事?
mïu流是一个kafkastream,如下所示:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
for (final KafkaStream stream : streams) {
// m_stream is one of these streams
}
2条答案
按热度按时间r3i60tvu1#
解决方法是添加属性
“consumer.timeout.ms”
现在,当达到超时时,将抛出consumertimeoutexception
gzszwxb42#
方法
hasNext()
正在阻塞。您可以在属性中更改阻塞的超时
consumer.timeout.ms
请注意,它将抛出TimeoutException
超时将过期时。会阅读这些关于消费者的文档:https://cwiki.apache.org/confluence/display/kafka/consumer+group+examplehttps用法:/cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example