Spring 自动装配共享队列 NullPointerException
Spring Autowired Shared Queue NullPointerException
我是第一次使用 Spring,我正在尝试实现一个共享队列,其中 Kafka 侦听器将消息放在共享队列上,还有一个 ThreadManager 最终将对它的项目执行多线程操作取消共享队列。这是我当前的实现:
听众:
@Component
public class Listener {
@Autowired
private QueueConfig queueConfig;
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
try{
queueConfig.blockingQueue().put(record);
// System.out.println(queueConfig.blockingQueue().take());
} catch (Exception e){
System.out.print(e.toString());
}
}
}));
}
}
队列:
@Configuration
public class QueueConfig {
private Properties appProps = new AppProperties().get();
@Bean
public BlockingQueue<ConsumerRecord> blockingQueue() {
return new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
}
}
线程管理器:
@Component
public class ThreadManager {
@Autowired
private QueueConfig queueConfig;
private int threads;
public ThreadManager() {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
queueConfig.blockingQueue().take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
最后,一切开始的主线程:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
ThreadManager threadManager = new ThreadManager();
try{
threadManager.run();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
问题
我可以在调试器中告诉 运行 侦听器正在向队列中添加内容。当 ThreadManager 取消共享队列时,它告诉我队列为空并且我得到一个 NPE。似乎自动装配无法将侦听器正在使用的队列连接到 ThreadManager。任何帮助表示赞赏。
您使用 Spring 的编程方式,即所谓的 'JavaConfig',设置 Spring bean 的方式(类 用 @Configuration
注释,方法注释@Bean
)。通常在应用程序启动时 Spring 会在幕后调用那些 @Bean
方法并将它们注册到它的应用程序上下文中(如果范围是单例 - 默认 - 这只会发生一次!)。无需在代码中的任何地方直接调用那些 @Bean
方法......你不能这样做,否则你将获得一个可能未完全配置的单独的新实例!
相反,您需要将 QueueConfig.blockingQueue()
方法中 'configured' 的 BlockingQueue<ConsumerRecord>
注入到 ThreadManager
中。由于队列似乎是 ThreadManager
工作的强制依赖项,我会让 Spring 通过构造函数注入它:
@Component
public class ThreadManager {
private int threads;
// add instance var for queue...
private BlockingQueue<ConsumerRecord> blockingQueue;
// you could add @Autowired annotation to BlockingQueue param,
// but I believe it's not mandatory...
public ThreadManager(BlockingQueue<ConsumerRecord> blockingQueue) {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
this.blockingQueue = blockingQueue;
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
this.blockingQueue.take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
再澄清一件事:默认情况下,Spring 使用 @Bean
方法的方法名称为该 bean 分配一个唯一 ID(方法名称 == bean id)。所以你的方法被称为 blockingQueue
,意味着你的 BlockingQueue<ConsumerRecord>
实例也将在应用程序上下文中使用 id blockingQueue
注册。新的构造函数参数也被命名为 blockingQueue
并且它的类型匹配 BlockingQueue<ConsumerRecord>
。简而言之,这是 Spring 查找和 injects/wires 依赖关系的一种方式。
这是问题所在:
ThreadManager threadManager = new ThreadManager();
由于您是手动创建实例,因此无法使用Spring提供的DI。
一个简单的解决方案是实现一个 CommandLineRunner,它将在完成 SourceAccountListenerApp
初始化后执行:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
}
// Create the CommandLineRunner Bean and inject ThreadManager
@Bean
CommandLineRunner runner(ThreadManager manager){
return args -> {
manager.run();
};
}
}
我是第一次使用 Spring,我正在尝试实现一个共享队列,其中 Kafka 侦听器将消息放在共享队列上,还有一个 ThreadManager 最终将对它的项目执行多线程操作取消共享队列。这是我当前的实现:
听众:
@Component
public class Listener {
@Autowired
private QueueConfig queueConfig;
private ExecutorService executorService;
private List<Future> futuresThread1 = new ArrayList<>();
public Listener() {
Properties appProps = new AppProperties().get();
this.executorService = Executors.newFixedThreadPool(Integer.parseInt(appProps.getProperty("listenerThreads")));
}
//TODO: how can I pass an approp into this annotation?
@KafkaListener(id = "id0", topics = "bose.cdp.ingest.marge.boseaccount.normalized")
public void listener(ConsumerRecord<?, ?> record) throws InterruptedException, ExecutionException
{
futuresThread1.add(executorService.submit(new Runnable() {
@Override public void run() {
try{
queueConfig.blockingQueue().put(record);
// System.out.println(queueConfig.blockingQueue().take());
} catch (Exception e){
System.out.print(e.toString());
}
}
}));
}
}
队列:
@Configuration
public class QueueConfig {
private Properties appProps = new AppProperties().get();
@Bean
public BlockingQueue<ConsumerRecord> blockingQueue() {
return new ArrayBlockingQueue<>(
Integer.parseInt(appProps.getProperty("blockingQueueSize"))
);
}
}
线程管理器:
@Component
public class ThreadManager {
@Autowired
private QueueConfig queueConfig;
private int threads;
public ThreadManager() {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
queueConfig.blockingQueue().take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
最后,一切开始的主线程:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
ThreadManager threadManager = new ThreadManager();
try{
threadManager.run();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
问题
我可以在调试器中告诉 运行 侦听器正在向队列中添加内容。当 ThreadManager 取消共享队列时,它告诉我队列为空并且我得到一个 NPE。似乎自动装配无法将侦听器正在使用的队列连接到 ThreadManager。任何帮助表示赞赏。
您使用 Spring 的编程方式,即所谓的 'JavaConfig',设置 Spring bean 的方式(类 用 @Configuration
注释,方法注释@Bean
)。通常在应用程序启动时 Spring 会在幕后调用那些 @Bean
方法并将它们注册到它的应用程序上下文中(如果范围是单例 - 默认 - 这只会发生一次!)。无需在代码中的任何地方直接调用那些 @Bean
方法......你不能这样做,否则你将获得一个可能未完全配置的单独的新实例!
相反,您需要将 QueueConfig.blockingQueue()
方法中 'configured' 的 BlockingQueue<ConsumerRecord>
注入到 ThreadManager
中。由于队列似乎是 ThreadManager
工作的强制依赖项,我会让 Spring 通过构造函数注入它:
@Component
public class ThreadManager {
private int threads;
// add instance var for queue...
private BlockingQueue<ConsumerRecord> blockingQueue;
// you could add @Autowired annotation to BlockingQueue param,
// but I believe it's not mandatory...
public ThreadManager(BlockingQueue<ConsumerRecord> blockingQueue) {
Properties appProps = new AppProperties().get();
this.threads = Integer.parseInt(appProps.getProperty("threadManagerThreads"));
this.blockingQueue = blockingQueue;
}
public void run() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
try {
while (true){
this.blockingQueue.take();
}
} catch (Exception e){
System.out.print(e.toString());
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}
}
再澄清一件事:默认情况下,Spring 使用 @Bean
方法的方法名称为该 bean 分配一个唯一 ID(方法名称 == bean id)。所以你的方法被称为 blockingQueue
,意味着你的 BlockingQueue<ConsumerRecord>
实例也将在应用程序上下文中使用 id blockingQueue
注册。新的构造函数参数也被命名为 blockingQueue
并且它的类型匹配 BlockingQueue<ConsumerRecord>
。简而言之,这是 Spring 查找和 injects/wires 依赖关系的一种方式。
这是问题所在:
ThreadManager threadManager = new ThreadManager();
由于您是手动创建实例,因此无法使用Spring提供的DI。
一个简单的解决方案是实现一个 CommandLineRunner,它将在完成 SourceAccountListenerApp
初始化后执行:
@SpringBootApplication
public class SourceAccountListenerApp {
public static void main(String[] args) {
SpringApplication.run(SourceAccountListenerApp.class, args);
}
// Create the CommandLineRunner Bean and inject ThreadManager
@Bean
CommandLineRunner runner(ThreadManager manager){
return args -> {
manager.run();
};
}
}