Spring Kafka 模板 - 在 Spring 启动时连接到 Kafka 主题
Spring Kafka Template - Connect to Kafka Topic on Spring Boot Startup
我已经实现了一个使用 Spring Kafka 的基本 Spring 启动应用程序。我希望我的制作人在调用第一个 .send()
之前连接到 Kafka 主题,但我找不到这样做的方法。这可能吗?
显示 KafkaTemplate 仅在我在 16:12:44
触发 .send
方法后连接到 Kafka 主题的日志:
2021-11-24 16:12:12.602 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : The following profiles are active: dev
2021-11-24 16:12:13.551 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559 INFO 63930 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-24 16:12:13.559 INFO 63930 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613 INFO 63930 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-11-24 16:12:13.613 INFO 63930 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 974 ms
2021-11-24 16:12:13.989 INFO 63930 --- [ main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [springfox.documentation.swagger2.web.Swagger2Controller#getDocumentation(String, HttpServletRequest)]
2021-11-24 16:12:14.190 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-24 16:12:14.190 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2021-11-24 16:12:14.207 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2021-11-24 16:12:14.239 INFO 63930 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
2021-11-24 16:12:14.336 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : Started KafkaProducerExampleApplication in 7.055 seconds (JVM running for 7.341)
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-24 16:12:44.551 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-24 16:12:44.649 INFO 63930 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
使用non-transactional producer
(未提供transactionIdPrefix),当您第一次调用KafkaTemplate.send时,它将委托给ProducerFactory
以获取单个实例 共 Producer
个。这个时候,因为Producer
之前没有单例,所以ProducerFactory
会为你创建这个(这就是你看到日志[=16=的原因) ]).这个生产者实例现在被所有客户used/shared。
所以如果你想预先创建上面的生产者实例,你可以直接在ProducerFactory
上调用它,例如:
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaProducerFactory.createProducer();
return kafkaTemplate;
...
关于 Linh Vu 的回答,最好不要在 bean 定义中创建连接 - 在应用程序上下文的生命周期中还为时过早。
相反,添加一个实现 SmartLifecycle
的 bean 并在 start()
中创建连接;这样,上下文将在连接之前完全初始化。
@Bean
SmartLifecycle connector(ProducerFactory<Object ,Object> pf) {
return new SmartLifecycle() {
@Override
public void stop() {
}
@Override
public void start() {
pf.createProducer().close();
}
@Override
public boolean isRunning() {
return false;
}
};
}
SmartLifecycle
bean 对我们有用,谢谢。
@Component
class KafkaProducer (
private val userChangeLogTemplate: KafkaTemplate<String, UserChangeLog>
private val kafkaProperties: MizenKafkaProperties
) : NotificationProducer{
@Bean
fun connector(pf: ProducerFactory<String, Any>): SmartLifecycle {
return object : SmartLifecycle {
override fun stop() {}
override fun start() {
pf.createProducer().close()
}
override fun isRunning(): Boolean {
return false
}
}
}
override fun sendUserChangeLog(message: UserChangeLog) {
userChangeLogTemplate.send(kafkaProperties.userChangeLogTopic, message)
}
}
我已经实现了一个使用 Spring Kafka 的基本 Spring 启动应用程序。我希望我的制作人在调用第一个 .send()
之前连接到 Kafka 主题,但我找不到这样做的方法。这可能吗?
显示 KafkaTemplate 仅在我在 16:12:44
触发 .send
方法后连接到 Kafka 主题的日志:
2021-11-24 16:12:12.602 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : The following profiles are active: dev
2021-11-24 16:12:13.551 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2021-11-24 16:12:13.559 INFO 63930 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-11-24 16:12:13.559 INFO 63930 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.53]
2021-11-24 16:12:13.613 INFO 63930 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-11-24 16:12:13.613 INFO 63930 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 974 ms
2021-11-24 16:12:13.989 INFO 63930 --- [ main] pertySourcedRequestMappingHandlerMapping : Mapped URL path [/v2/api-docs] onto method [springfox.documentation.swagger2.web.Swagger2Controller#getDocumentation(String, HttpServletRequest)]
2021-11-24 16:12:14.190 INFO 63930 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-11-24 16:12:14.190 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Context refreshed
2021-11-24 16:12:14.207 INFO 63930 --- [ main] d.s.w.p.DocumentationPluginsBootstrapper : Found 1 custom documentation plugin(s)
2021-11-24 16:12:14.239 INFO 63930 --- [ main] s.d.s.w.s.ApiListingReferenceScanner : Scanning for api listing references
2021-11-24 16:12:14.336 INFO 63930 --- [ main] c.e.k.KafkaProducerExampleApplication : Started KafkaProducerExampleApplication in 7.055 seconds (JVM running for 7.341)
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-24 16:12:44.550 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-24 16:12:44.551 INFO 63930 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-24 16:12:44.649 INFO 63930 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
使用non-transactional producer
(未提供transactionIdPrefix),当您第一次调用KafkaTemplate.send时,它将委托给ProducerFactory
以获取单个实例 共 Producer
个。这个时候,因为Producer
之前没有单例,所以ProducerFactory
会为你创建这个(这就是你看到日志[=16=的原因) ]).这个生产者实例现在被所有客户used/shared。
所以如果你想预先创建上面的生产者实例,你可以直接在ProducerFactory
上调用它,例如:
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaProducerFactory.createProducer();
return kafkaTemplate;
...
关于 Linh Vu 的回答,最好不要在 bean 定义中创建连接 - 在应用程序上下文的生命周期中还为时过早。
相反,添加一个实现 SmartLifecycle
的 bean 并在 start()
中创建连接;这样,上下文将在连接之前完全初始化。
@Bean
SmartLifecycle connector(ProducerFactory<Object ,Object> pf) {
return new SmartLifecycle() {
@Override
public void stop() {
}
@Override
public void start() {
pf.createProducer().close();
}
@Override
public boolean isRunning() {
return false;
}
};
}
SmartLifecycle
bean 对我们有用,谢谢。
@Component
class KafkaProducer (
private val userChangeLogTemplate: KafkaTemplate<String, UserChangeLog>
private val kafkaProperties: MizenKafkaProperties
) : NotificationProducer{
@Bean
fun connector(pf: ProducerFactory<String, Any>): SmartLifecycle {
return object : SmartLifecycle {
override fun stop() {}
override fun start() {
pf.createProducer().close()
}
override fun isRunning(): Boolean {
return false
}
}
}
override fun sendUserChangeLog(message: UserChangeLog) {
userChangeLogTemplate.send(kafkaProperties.userChangeLogTopic, message)
}
}