一般来说,我对java、spring和kafka都是新手。情况如下:
我使用了@kafkalistener注解来创建一个kafka消费者,看起来是这样的:
public class Listener {
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record, ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
System.out.println(record);
arrayBlockingQueue.add(record);
}
}));
}
}
我向侦听器添加了一个参数arrayblockingqueue,我希望它将kafka的消息添加到该参数中。
我遇到的问题是,我不知道如何将arrayblockingqueue传递到侦听器中,因为spring在幕后处理侦听器的示例化和运行。
我需要这个阻塞队列,以便侦听器之外的另一个对象可以访问消息并对其进行一些处理。例如,我的主要观点是:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
Properties appProps = new AppProperties().get();
ArrayBlockingQueue<ConsumerRecord> arrayBlockingQueue = new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
//TODO: This starts my listener. How do I pass the queue to it?
SpringApplication.run(SourceAccountListenerApp.class, args);
}
}
1条答案
按热度按时间egmofgnx1#
有很多方法可以将阻塞队列声明为bean。
一个例子,main:
侦听器: