spring自动连线共享队列nullpointerexception

pinkon5k  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(344)

我第一次使用spring,并尝试实现一个共享队列,其中kafka侦听器将消息放在共享队列上,threadmanager最终将对从共享队列中取出的项执行多线程处理。以下是我当前的实现:
听众:

@Component
public class Listener {
    @Autowired
    private QueueConfig queueConfig;
    private ExecutorService executorService;
    private List<Future> futuresThread1 = new ArrayList<>();
    public Listener() {
        Properties appProps = new AppProperties().get();
        this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
    }
    //TODO: how can I pass an approp into this annotation?
    @KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
    public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
        {
            futuresThread1.add(executorService.submit(new Runnable() {
                    @Override public void run() {
                        try{
                            queueConfig.blockingQueue().put(record);
//                            System.out.println(queueConfig.blockingQueue().take());
                        } catch (Exception e){
                            System.out.print(e.toString());
                        }

                    }
            }));
        }
}

队列:

@Configuration
public class QueueConfig {
    private Properties appProps = new AppProperties().get();

    @Bean
    public BlockingQueue<ConsumerRecord> blockingQueue() {
        return new ArrayBlockingQueue<>(
                Integer.parseInt(appProps.getProperty("blockingQueueSize"))
        );
    }
}

线程管理器:

@Component
public class ThreadManager {
    @Autowired
    private QueueConfig queueConfig;
    private int threads;

    public ThreadManager() {
        Properties appProps = new AppProperties().get();
        this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
    }

    public void run() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        try {
            while (true){
                queueConfig.blockingQueue().take();
            }
        } catch (Exception e){
            System.out.print(e.toString());
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
}

最后一点,一切都从主线开始:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);
        ThreadManager threadManager = new ThreadManager();
        try{
            threadManager.run();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }
}

问题
当在调试器中运行此命令时,我可以看出侦听器正在向队列中添加内容。当threadmanager离开共享队列时,它告诉我队列为null,我得到一个npe。似乎autowiring无法将侦听器使用的队列连接到threadmanager。谢谢你的帮助。

k4ymrczo

k4ymrczo1#

你用Spring´s programatic,即所谓的“javaconfig”,设置springbean(用 @Configuration 方法注解为 @Bean ). 通常在应用程序启动时,spring会调用 @Bean 方法,并在其应用程序上下文中注册它们(如果作用域是singleton-默认值-这只会发生一次!)。不用打电话了 @Bean 方法直接在代码中的任何位置。。。你不能,否则你会得到一个单独的,新的示例,可能是没有完全配置!
相反,你需要注射 BlockingQueue<ConsumerRecord> 您在 QueueConfig.blockingQueue() 你的方法 ThreadManager . 因为队列似乎是 ThreadManager 为了工作,我让spring通过构造函数注入它:

@Component
public class ThreadManager {

    private int threads;

    // add instance var for queue...
    private BlockingQueue<ConsumerRecord> blockingQueue;

    // you could add @Autowired annotation to BlockingQueue param,
    // but I believe it's not mandatory... 
    public ThreadManager(BlockingQueue<ConsumerRecord> blockingQueue) {
        Properties appProps = new AppProperties().get();
        this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
        this.blockingQueue = blockingQueue;
    }

    public void run() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threads);
        try {
            while (true){
                this.blockingQueue.take();
            }
        } catch (Exception e){
            System.out.print(e.toString());
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
}

只想澄清一件事:默认情况下 @Bean 方法被spring用来为这个bean分配一个唯一的id(方法名==bean id)。所以你的方法被称为 blockingQueue ,表示您的 BlockingQueue<ConsumerRecord> 示例也将以id注册 blockingQueue 在应用程序上下文中。新的构造函数参数也被命名为 blockingQueue 而且是类型匹配 BlockingQueue<ConsumerRecord> . 简化了,这是spring查找和注入/连接依赖项的一种方法。

kninwzqo

kninwzqo2#

这就是问题所在: ThreadManager threadManager = new ThreadManager(); 因为您是手动创建示例,所以不能使用spring提供的di。
一个简单的解决方案是实现commandlinerunner,它将在完成 SourceAccountListenerApp 初始化:

@SpringBootApplication
public class SourceAccountListenerApp {
    public static void main(String[] args) {
        SpringApplication.run(SourceAccountListenerApp.class, args);            
    }

    // Create the CommandLineRunner Bean and inject ThreadManager 
    @Bean
    CommandLineRunner runner(ThreadManager manager){
        return args -> {
            manager.run();
        };
    }

}

相关问题