Observable.blockingForEach 导致 OutOfMemoryError?

Observable.blockingForEach causing OutOfMemoryError?

我在 AWS ECS/EC2 中有一个 Springboot Kotlin 应用程序 运行,其中包含我的前任编写的一些代码。它在服务器中 运行 一段时间后遇到 OutOfMemoryError。我不熟悉 ReactiveX 和线程,所以想寻求一些帮助。

我从错误日志看应该和Observable.blockingForEach方法有关?

listOfKeys 是 5 到 50 个项目的有限源,所以应该不是由于背压?

发生OutOfMemoryError时,我也粘贴了线程转储。

myMethod 中的代码

Observable.fromIterable(listOfKeys)
                .flatMap { key -> 
                    Maybe.fromCallable {
                        Pair(key, methodWhichCallsExternalAPI))
                    }
                        .toObservable()
                        .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(4)))
                }
                .blockingForEach {
                    // some simple assignment of result to variables
                }

异常日志

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1367)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at io.reactivex.internal.schedulers.ExecutorScheduler.scheduleDirect(ExecutorScheduler.java:59)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:165)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at io.reactivex.internal.operators.observable.BlockingObservableIterable.iterator(BlockingObservableIterable.java:39)
at io.reactivex.Observable.blockingForEach(Observable.java:5183)
at com.myOrg.MyController.myMethod(MyController.kt:240)
at com.myOrg.MyController$$FastClassBySpringCGLIB$688575.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:61)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.dbs.localisemanagement.localise.LocaliseController$$EnhancerBySpringCGLIB$e102d4f.lokaliseProjectsS3(<generated>)
at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1064)

线程转储 - 3 个部分中的每一个都重复多次

at sun.misc.Unsafe.park(Native Method)
"pool-30968-thread-1" Id=36794 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394a0729
at java.lang.Thread.run(Thread.java:748)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@f0c2d74
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
-  locked java.lang.Object@5e78eb81
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.socketRead0(Native Method)
"pool-30982-thread-1" Id=36808 RUNNABLE
- java.util.concurrent.ThreadPoolExecutor$Worker@7d90150f
Number of locked synchronizers = 1
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:306)
at org.apache.http.pool.AbstractConnPool.get(AbstractConnPool.java:198)
-  locked org.apache.http.pool.AbstractConnPool@a0abf72
at org.apache.http.pool.AbstractConnPool.get(AbstractConnPool.java:253)
at org.apache.http.pool.AbstractConnPool.access0(AbstractConnPool.java:70)
at org.apache.http.pool.AbstractConnPool.getPoolEntryBlocking(AbstractConnPool.java:393)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7399fe63
at sun.misc.Unsafe.park(Native Method)
"pool-30987-thread-1" Id=36813 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@7399fe63
- java.util.concurrent.ThreadPoolExecutor$Worker@5b4dcfd1
Number of locked synchronizers = 1

最有可能的问题是重复创建了一个线程池,使用后没有释放

.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(4)))

方法运行的次数越多,它创建的废弃线程就越多。

您可能应该创建一个静态 Scheduler 并使用它:

static final ExecutorService pool =
    Executors.newFixedThreadPool(4);
static final Scheduler scheduler = Schedulers.from(pool);

稍后,在应用的生命周期结束后,调用 pool.shutdown() 释放线程。