public class MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
private void run() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
String topic = "mytopic";
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 2);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
int thread = 0;
LOGGER.info("size: {}", streams.size());
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (final KafkaStream stream : streams) {
final int tid = thread++;
LOGGER.info("submit thread {}", tid);
executorService.execute(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
byte[] key = messageAndMetadata.key();
String message = new String(messageAndMetadata.message());
LOGGER.info("key: {} message: {} thread: {}", key, message, tid);
}
}
});
}
if (consumerConnector != null)
consumerConnector.shutdown();
}
public static void main(String[] args) {
new MessageHandler().run();
}
}
运行此使用者后,我得到以下异常:
WARN 2016-08-13 22:46:56.969] [testgroup_debian-1471099616127-8c8586c4-leader-finder-thread] kafka.utils.Logging$class.warn(Logging.scala:89) [Fetching topic metadata with correlation id 0 for topics [Set(mytopic)] from broker [BrokerEndPoint(0,debian,9092)] failed]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
为什么我会得到这个例外?代理和zookeeper的配置应该可以,因为我可以使用控制台生产者/消费者发送/接收消息。
2条答案
按热度按时间blmhpbnm1#
我好像找到问题了。呼叫
在使用任何消息之前立即关闭连接。
p4tfgftt2#
使用者是单线程的,不是线程安全的。如果您想从两个线程中消费,那么每个线程都需要自己的线程
ConsumerConnector
示例。