Spring 拆分大流时出现集成背压错误
Spring integration backpressure error on splitting large stream
目标是将大型 json.gz 文件(压缩 4 GB,未压缩约 12 GB,1200 万行)从 Web 服务器直接流式传输到数据库,而无需在本地下载。由于 Spring 集成出站网关不支持 gzip 格式,我自己使用 okhttp 自动解压缩响应:
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
.onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
.doAfterTerminate(() -> closeQuietly(closeables))
.doOnError(t -> log.error(...))
集成流程中:
.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())
但是
2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1];
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...),
failedMessage=...}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
输出通道在运行时使用由网桥轮询的无界队列连接。这是为了方便测试,可以将队列替换为DirectChannel
进行测试。
@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
return IntegrationFlows.from(streamingOutputChannel())
.bridge(e -> e.poller(Pollers.fixedDelay(500)))
.channel(repositoryInputChannel())
.get();
}
我有几个疑问。
- 我不确定在 bean 名称中使用 SPEL 的动态绑定是否有效,但我不知道如何验证它。
- 由于队列是无界的,我只能想到轮询不够快。但是,异常表明分离器在跟上时遇到问题。
问题是log
语句!它使用窃听器将分离器的输出通道更改为 DirectChannel
,这打乱了 AbstractMessageSplitter.
的逻辑
boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;
引用文档:
Starting with version 5.0, ... if Splitter’s output channel is an
instance of a ReactiveStreamsSubscribableChannel, the
AbstractMessageSplitter produces a Flux result instead of an Iterator
and the output channel is subscribed to this Flux for back-pressure
based splitting on downstream flow demand.
工作代码如下 - 只需将日志语句从拆分器之后立即移动到末尾即可解决背压问题:
IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
.onBackpressureBuffer(getOnBackpressureBufferSize(),
s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();
我已经在 spring 集成 GitHub 上打开了问题 2302。
目标是将大型 json.gz 文件(压缩 4 GB,未压缩约 12 GB,1200 万行)从 Web 服务器直接流式传输到数据库,而无需在本地下载。由于 Spring 集成出站网关不支持 gzip 格式,我自己使用 okhttp 自动解压缩响应:
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
.onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
.doAfterTerminate(() -> closeQuietly(closeables))
.doOnError(t -> log.error(...))
集成流程中:
.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())
但是
2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1];
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...),
failedMessage=...}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
输出通道在运行时使用由网桥轮询的无界队列连接。这是为了方便测试,可以将队列替换为DirectChannel
进行测试。
@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
return IntegrationFlows.from(streamingOutputChannel())
.bridge(e -> e.poller(Pollers.fixedDelay(500)))
.channel(repositoryInputChannel())
.get();
}
我有几个疑问。
- 我不确定在 bean 名称中使用 SPEL 的动态绑定是否有效,但我不知道如何验证它。
- 由于队列是无界的,我只能想到轮询不够快。但是,异常表明分离器在跟上时遇到问题。
问题是log
语句!它使用窃听器将分离器的输出通道更改为 DirectChannel
,这打乱了 AbstractMessageSplitter.
boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;
引用文档:
Starting with version 5.0, ... if Splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel, the AbstractMessageSplitter produces a Flux result instead of an Iterator and the output channel is subscribed to this Flux for back-pressure based splitting on downstream flow demand.
工作代码如下 - 只需将日志语句从拆分器之后立即移动到末尾即可解决背压问题:
IntegrationFlows.from(inputChannel)
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
.handle(new GzipToFluxTransformer(...))
.transform((Flux<String> payload) -> payload
.onBackpressureBuffer(getOnBackpressureBufferSize(),
s -> log.error("Buffer overrun!")))
.split()
.channel(c -> c.flux(outputChannel))
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.get();
我已经在 spring 集成 GitHub 上打开了问题 2302。