我有osgi框架,在这个框架中,我接受一个包中的rest调用,rest调用中接收的数据被发送给kafka brocker。还有另一个bundle将使用来自brocker的消息。
如果我在rest bundle之前初始化kafka使用者bundle,则永远不会调用rest bundleactivator,因为代码在kafka使用者代码的while循环中运行。如果我在消费包之前初始化rest包,消费包就永远不会启动。
Kafka束激活器代码如下:
public class KafkaConsumerActivator implements BundleActivator {
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KEY_DESERIALIZER = "key.deserializer";
private ConsumerConnector consumerConnector;
private KafkaConsumer<String, String> consumer;
private static final String VALUE_DESERIALIZER = "value.deserializer";
public void start(BundleContext context) throws Exception {
Properties properties = new Properties();
properties.put(ZOOKEEPER_CONNECT,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
properties.put(BOOTSTRAP_SERVERS,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(properties);
try {
consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
1条答案
按热度按时间vxqlmq5t1#
永远不要在激活器的start方法中做一些花费很长时间的事情。它将阻塞整个osgi框架。
最好在一个额外的线程中执行整个连接和循环。在stop方法中,您可以告诉这个线程退出。