java—如何在osgi框架中配置kafka 0.9.0.0使用者?

zvokhttg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(367)

我有osgi框架,在这个框架中,我接受一个包中的rest调用,rest调用中接收的数据被发送给kafka brocker。还有另一个bundle将使用来自brocker的消息。
如果我在rest bundle之前初始化kafka使用者bundle,则永远不会调用rest bundleactivator,因为代码在kafka使用者代码的while循环中运行。如果我在消费包之前初始化rest包,消费包就永远不会启动。
Kafka束激活器代码如下:

public class KafkaConsumerActivator implements BundleActivator {

    private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
    private static final String GROUP_ID = "group.id";
    private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
    private static final String KEY_DESERIALIZER = "key.deserializer";

    private ConsumerConnector consumerConnector;

    private KafkaConsumer<String, String> consumer;

    private static final String VALUE_DESERIALIZER = "value.deserializer";
    public void start(BundleContext context) throws Exception {

        Properties properties = new Properties();
        properties.put(ZOOKEEPER_CONNECT,
            MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
        properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
        properties.put(BOOTSTRAP_SERVERS,
            MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
        properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
        properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
        consumer = new KafkaConsumer<>(properties);

        try {
            consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    data.put("value", record.value());
                    System.out.println(": " + data);
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.close();
        }
    }
}
vxqlmq5t

vxqlmq5t1#

永远不要在激活器的start方法中做一些花费很长时间的事情。它将阻塞整个osgi框架。
最好在一个额外的线程中执行整个连接和循环。在stop方法中,您可以告诉这个线程退出。

相关问题