Kafka 在重新部署后流式传输错误 Tomcat
Kafka streams errors after redeploying Tomcat
我在我的项目中使用 kafka 流。我将我的项目编译为 war 和 运行 它在 tomcat.
我的项目如我所愿地运行,没有任何错误。如果我先停止 tomcat 然后启动它,它可以正常工作。 但是,如果我在不停止 tomcat 的情况下重新部署(取消部署和部署)服务,我会开始收到错误。我研究的时候有资料说tomcat缓存了老版本的服务。即使我应用了一些解决方案,我也无法找到解决方案。如果你能帮助我,我将不胜感激。
我想再说一遍。我的代码块正常工作。如果我 运行 在 tomcat 中第一次使用该服务,我不会收到错误消息。或者,如果我完全关闭 tomcat 并重新启动它,我不会收到错误消息。但是,如果我在不停止 tomcat 的情况下重新部署(取消部署和部署)服务,我会开始收到错误消息。
我在下面分享一个小代码块。
Properties streamConfiguration = kafkaStreamsConfiguration.createStreamConfiguration(createKTableGroupId(), new AppSerdes.DataWrapperSerde());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, DataWrapper> kTableDataWrapper = streamsBuilder.table(topicAction.getTopicName());
KTable<String, DataWrapper> kTableWithStore = kTableDataWrapper.filter((key, dataWrapper) -> key != null && dataWrapper != null, Materialized.as(createStoreName()));
kTableWithStore.toStream().filter((key, dataWrapper) -> // Filter)
.mapValues((ValueMapperWithKey<String, DataWrapper, Object>) (key, dataWrapper) -> {
// Logics
})
.to(createOutputTopicName());
this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamConfiguration);
this.kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}));
public Properties createStreamConfiguration(String appId, Serde serde) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass());
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableCommitIntervalMs());
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableMaxByteBufferMB() * 1024 * 1024);
properties.put(StreamsConfig.STATE_DIR_CONFIG, KafkaStreamsConfigurationConstants.stateStoreLocation);
return properties;
}
错误:
2022-02-16 14:19:39.663 WARN 9529 --- [ Thread-462] o.a.k.s.p.i.StateDirectory : Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS
2022-02-16 14:19:39.677 ERROR 9529 --- [ Thread-462] o.a.k.s.p.i.StateDirectory : Unable to obtain lock as state directory is already locked by another process
2022-02-16 14:19:39.702 ERROR 9529 --- [ Thread-462] f.t.s.c.- Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Localized Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Print Stack Trace : org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
at org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:186)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:681)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:657)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:567)
我认为这是因为
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}));
在 re-deploy 期间未被调用,因为 JVM 进程继续 运行。请尝试使用其他方式在您的应用程序重新部署时收到通知,例如使用 ServletContextListener
感谢@udalmik,我的问题得到了解决。
我通过从 DisposableBean 扩展我的 bean 解决了我的问题。
此外,我还有原型 bean。该解决方案不适用于我的原型 bean。
我正在为原型和单例 bean 编写解决方案。
// For Singleton Bean
@Service
public class PersonSingletonBean implements DisposableBean {
@Override
public void destroy() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}
}
// For PrototypeBean
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PersonPrototypeBean implements DisposableBean {
@Override
public void destroy() {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}
}
@Service
public class PersonPrototypeBeanList implements DisposableBean {
private final List<PersonPrototypeBean> personPrototypeBeanList = Collections.synchronizedList(new ArrayList<>());
public void addToPersonPrototypeBeanList(PersonPrototypeBean personPrototypeBean) {
personPrototypeBeanList.add(personPrototypeBean);
}
public void destroy() throws Exception {
synchronized (personPrototypeBeanList) {
for (PersonPrototypeBean personPrototypeBean : personPrototypeBeanList) {
if (personPrototypeBean != null) {
((DisposableBean) personPrototypeBean).destroy();
}
}
personPrototypeBeanList.clear();
}
}
}
我在我的项目中使用 kafka 流。我将我的项目编译为 war 和 运行 它在 tomcat.
我的项目如我所愿地运行,没有任何错误。如果我先停止 tomcat 然后启动它,它可以正常工作。 但是,如果我在不停止 tomcat 的情况下重新部署(取消部署和部署)服务,我会开始收到错误。我研究的时候有资料说tomcat缓存了老版本的服务。即使我应用了一些解决方案,我也无法找到解决方案。如果你能帮助我,我将不胜感激。
我想再说一遍。我的代码块正常工作。如果我 运行 在 tomcat 中第一次使用该服务,我不会收到错误消息。或者,如果我完全关闭 tomcat 并重新启动它,我不会收到错误消息。但是,如果我在不停止 tomcat 的情况下重新部署(取消部署和部署)服务,我会开始收到错误消息。
我在下面分享一个小代码块。
Properties streamConfiguration = kafkaStreamsConfiguration.createStreamConfiguration(createKTableGroupId(), new AppSerdes.DataWrapperSerde());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, DataWrapper> kTableDataWrapper = streamsBuilder.table(topicAction.getTopicName());
KTable<String, DataWrapper> kTableWithStore = kTableDataWrapper.filter((key, dataWrapper) -> key != null && dataWrapper != null, Materialized.as(createStoreName()));
kTableWithStore.toStream().filter((key, dataWrapper) -> // Filter)
.mapValues((ValueMapperWithKey<String, DataWrapper, Object>) (key, dataWrapper) -> {
// Logics
})
.to(createOutputTopicName());
this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamConfiguration);
this.kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}));
public Properties createStreamConfiguration(String appId, Serde serde) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass());
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableCommitIntervalMs());
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableMaxByteBufferMB() * 1024 * 1024);
properties.put(StreamsConfig.STATE_DIR_CONFIG, KafkaStreamsConfigurationConstants.stateStoreLocation);
return properties;
}
错误:
2022-02-16 14:19:39.663 WARN 9529 --- [ Thread-462] o.a.k.s.p.i.StateDirectory : Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS
2022-02-16 14:19:39.677 ERROR 9529 --- [ Thread-462] o.a.k.s.p.i.StateDirectory : Unable to obtain lock as state directory is already locked by another process
2022-02-16 14:19:39.702 ERROR 9529 --- [ Thread-462] f.t.s.c.- Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Localized Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Print Stack Trace : org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
at org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:186)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:681)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:657)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:567)
我认为这是因为
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}));
在 re-deploy 期间未被调用,因为 JVM 进程继续 运行。请尝试使用其他方式在您的应用程序重新部署时收到通知,例如使用 ServletContextListener
感谢@udalmik,我的问题得到了解决。
我通过从 DisposableBean 扩展我的 bean 解决了我的问题。
此外,我还有原型 bean。该解决方案不适用于我的原型 bean。 我正在为原型和单例 bean 编写解决方案。
// For Singleton Bean
@Service
public class PersonSingletonBean implements DisposableBean {
@Override
public void destroy() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}
}
// For PrototypeBean
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PersonPrototypeBean implements DisposableBean {
@Override
public void destroy() {
if (kafkaStreams != null) {
kafkaStreams.close();
}
}
}
@Service
public class PersonPrototypeBeanList implements DisposableBean {
private final List<PersonPrototypeBean> personPrototypeBeanList = Collections.synchronizedList(new ArrayList<>());
public void addToPersonPrototypeBeanList(PersonPrototypeBean personPrototypeBean) {
personPrototypeBeanList.add(personPrototypeBean);
}
public void destroy() throws Exception {
synchronized (personPrototypeBeanList) {
for (PersonPrototypeBean personPrototypeBean : personPrototypeBeanList) {
if (personPrototypeBean != null) {
((DisposableBean) personPrototypeBean).destroy();
}
}
personPrototypeBeanList.clear();
}
}
}