终止使用 Kafka-Streams 和 MongoDB 的 Spring 引导应用程序
Terminate a Spring Boot application that uses Kafka-Streams and MongoDB
我有一个使用 Kafka-Streams 的 Spring 启动应用程序。详细地说,有一个流使用 MongoDB 中执行的查询结果过滤它接收到的消息。代码类似于以下内容。
final KStream<String, String> stream =
kStreamBuilder.stream(Serdes.String(), Serdes.String(), inputTopic)
.filter((s, message) -> service.hasSomeProperty(message))
.to(Serdes.String(), Serdes.String(), outpuTopic);
方法 service.hasSomeProperty(message)
调用 Mongo 存储库,该存储库对专用集合执行查询。
如果在与Mongo通信的过程中出现问题,异常会在Spring处理流的线程中被拦截。 流停止工作,但应用程序未正常停止。
具体来说,我们面临的错误如下。
Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream
at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:494)
at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:224)
at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:96)
at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:440)
at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:112)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:176)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:216)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:207)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:113)
at com.mongodb.operation.FindOperation.call(FindOperation.java:516)
at com.mongodb.operation.FindOperation.call(FindOperation.java:510)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:435)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:408)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:510)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:81)
at com.mongodb.Mongo.execute(Mongo.java:836)
at com.mongodb.Mongo.execute(Mongo.java:823)
at com.mongodb.DBCursor.initializeCursor(DBCursor.java:870)
at com.mongodb.DBCursor.hasNext(DBCursor.java:142)
at com.mongodb.DBCursor.one(DBCursor.java:679)
at com.mongodb.DBCollection.findOne(DBCollection.java:833)
at com.mongodb.DBCollection.findOne(DBCollection.java:796)
at com.mongodb.DBCollection.findOne(DBCollection.java:743)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2179)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2163)
at org.springframework.data.mongodb.core.MongoTemplate.executeFindOneInternal(MongoTemplate.java:1907)
... 31 more
我只想知道 如何配置 Spring 启动应用程序以在 Kafka 流中与 Mongo 发生通信错误时停止筛选。
我知道这不是最好的方法,但我不能重构太多代码以使用 GlobalKTable
而不是 Mongo 集合。
感谢大家。
我在我的一个应用程序中所做的,在管理 KafkaStreams 实例并实现 InitializingBean 和 DisposableBean 接口的 Spring 组件中:
@Autowired
private ApplicationContext appContext;
private KafkaStreams streams;
@Override
public void destroy() throws Exception {
if(streams.state().isRunning())
streams.close();
}
@Override
public void afterPropertiesSet() throws Exception {
... // streams setup
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
LOG.error("Unexpected error in stream processing for thread: " + thread, throwable);
closeApp();
});
... // streams start
}
private void closeApp() {
((ConfigurableApplicationContext) appContext).close();
}
到目前为止,这对我来说效果很好。您可以决定仅在 Mongo 个连接引发异常时停止您的应用程序。
我有一个使用 Kafka-Streams 的 Spring 启动应用程序。详细地说,有一个流使用 MongoDB 中执行的查询结果过滤它接收到的消息。代码类似于以下内容。
final KStream<String, String> stream =
kStreamBuilder.stream(Serdes.String(), Serdes.String(), inputTopic)
.filter((s, message) -> service.hasSomeProperty(message))
.to(Serdes.String(), Serdes.String(), outpuTopic);
方法 service.hasSomeProperty(message)
调用 Mongo 存储库,该存储库对专用集合执行查询。
如果在与Mongo通信的过程中出现问题,异常会在Spring处理流的线程中被拦截。 流停止工作,但应用程序未正常停止。
具体来说,我们面临的错误如下。
Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream
at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:494)
at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:224)
at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:96)
at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:440)
at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:112)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:176)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:216)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:207)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:113)
at com.mongodb.operation.FindOperation.call(FindOperation.java:516)
at com.mongodb.operation.FindOperation.call(FindOperation.java:510)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:435)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:408)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:510)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:81)
at com.mongodb.Mongo.execute(Mongo.java:836)
at com.mongodb.Mongo.execute(Mongo.java:823)
at com.mongodb.DBCursor.initializeCursor(DBCursor.java:870)
at com.mongodb.DBCursor.hasNext(DBCursor.java:142)
at com.mongodb.DBCursor.one(DBCursor.java:679)
at com.mongodb.DBCollection.findOne(DBCollection.java:833)
at com.mongodb.DBCollection.findOne(DBCollection.java:796)
at com.mongodb.DBCollection.findOne(DBCollection.java:743)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2179)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2163)
at org.springframework.data.mongodb.core.MongoTemplate.executeFindOneInternal(MongoTemplate.java:1907)
... 31 more
我只想知道 如何配置 Spring 启动应用程序以在 Kafka 流中与 Mongo 发生通信错误时停止筛选。
我知道这不是最好的方法,但我不能重构太多代码以使用 GlobalKTable
而不是 Mongo 集合。
感谢大家。
我在我的一个应用程序中所做的,在管理 KafkaStreams 实例并实现 InitializingBean 和 DisposableBean 接口的 Spring 组件中:
@Autowired
private ApplicationContext appContext;
private KafkaStreams streams;
@Override
public void destroy() throws Exception {
if(streams.state().isRunning())
streams.close();
}
@Override
public void afterPropertiesSet() throws Exception {
... // streams setup
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
LOG.error("Unexpected error in stream processing for thread: " + thread, throwable);
closeApp();
});
... // streams start
}
private void closeApp() {
((ConfigurableApplicationContext) appContext).close();
}
到目前为止,这对我来说效果很好。您可以决定仅在 Mongo 个连接引发异常时停止您的应用程序。