在 okHttp onFailure 回调中引发 OutOfMemory 异常

OutOfMemory Exception raised in okHttp onFailure callback

我正在使用 Kotlin 并编写一个函数来上传文件。在测试时,我观察到如果我执行以下操作,则会引发 OutOfMemory 异常并调用 onFailure 回调。

  1. Select 一个大小为 100 MB 的文件上传到服务器。通过调用 uploadFile.
  2. 上传
  3. 上传时,请断开互联网连接。 onFailure 被调用但出现异常 (timeout)。
  4. 再次启用互联网并尝试通过调用uploadFile函数再次上传。
  5. 重复第 2 步和第 3 步 1-2 次,应用程序崩溃。 onFailure 这次调用时出现 OutOfMemory 异常。

这是我的代码。

class UploadManager(
        private val fileTransferDataSource: IListFileTransferDataSource
) {

    private val uploadClient by lazy {
        OkHttpClient.Builder()
                .retryOnConnectionFailure(false)
                .writeTimeout(40, TimeUnit.SECONDS)
                .readTimeout(40, TimeUnit.SECONDS)
                .build()
    }

    suspend fun uploadFile(url: String,
                           fileUri: String,
                           downloadUrl: String?,
                           stream: InputStream,
                           callback: ((success: Boolean, filePath: String, url: String?, responseCode: Int?) -> Unit)? = null) {
        val baseUrl = FileTransferUtility.getBaseUrl(url)
        val authToken = fileTransferDataSource.getAuthenticationToken(baseUrl)
        if (baseUrl.isEmpty() || authToken.isEmpty()) {
            stream.close()
            callback?.let { it(false, fileUri, null, null) }
            return
        }

        kotlin.runCatching {
            val buf = ByteArray(stream.available())
            val bytesRead = stream.read(buf)
            stream.close()

            if (bytesRead == -1) {
                callback?.let { it(false, fileUri, null, null) }
                return@runCatching
            }
            val requestBody = create(FileTransferUtility.contentTypeStream.toMediaType(), buf)
            val request = requestBody.let {
                Request.Builder()
                        .url(url)
                        .post(it)
                        .addHeader(HttpConstants.Headers.AUTHORIZATION, HttpConstants.Values.AUTHORIZATION_TOKEN_BEARER_FORMAT.format(authToken))
                        .addHeader(HttpConstants.Headers.CONTENT_TYPE, HttpConstants.Values.APPLICATION_JSON)
                        .addHeader(HttpConstants.Headers.ACCEPT, HttpConstants.Values.APPLICATION_JSON)
                        .build()
            }

            if (request == null) {
                callback?.let { it(false, fileUri, null, null) }
                return@runCatching
            }
            
            uploadClient.newCall(request).enqueue(object : Callback {
                override fun onFailure(call: Call, e: IOException) {
                    call.cancel()
                    callback?.let { it(false, fileUri, null, null) }
                }

                override fun onResponse(call: Call, response: Response) {
                    response.body?.close()
                    callback?.let { it(response.code == 200, fileUri, downloadUrl, response.code) }
                }
            })
        }
    }

    //Restricting object creation for this class by making it singleton
    companion object : SingletonHolder<UploadManager, IListFileTransferDataSource>(::UploadManager)
}

但是,即使我上传多个大小为 100 MB 的文件并且它们可以毫无问题地上传,它也不会崩溃。仅当 onFailure 被多次触发时才会出现问题。我怀疑一些内部缓冲区在失败时没有得到释放。

我已经厌倦了以下。

  1. 正在取消调度程序
  2. 为日志级别 None 的日志添加拦截器。
  3. 在 onFailure 回调中显式取消调用

似乎没有什么可以解决这个问题。请帮助我确定内存泄漏。

堆栈跟踪:

java.io.IOException: canceled due to java.lang.OutOfMemoryError: Failed to allocate a 8208 byte allocation with 200 free bytes and 200B until OOM, target footprint 268435456, growth limit 268435456

0 = {StackTraceElement@16551} "okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:515)"
1 = {StackTraceElement@16552} "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)"
2 = {StackTraceElement@16553} "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)"
3 = {StackTraceElement@16554} "java.lang.Thread.run(Thread.java:919)"

我发现问题出在 val buf = ByteArray(inputStream.available())。它一次性创建了一个巨大的 ByteArray。如果我们必须上传多个更大的文件(在我的例子中,100MB),垃圾收集器需要一段时间来释放上次上传的内存。它导致了内存异常。我已经更改了我的代码,它不再是一个问题:

suspend fun uploadFile(
    url: String,
    fileUri: String,
    downloadUrl: String?,
    stream: InputStream,
    coroutineScope: CoroutineScope
): Flow<Result> = flow {
    kotlin.runCatching {
        val baseUrl = FileTransferUtility.getBaseUrl(url)
        val authToken = fileTransferDataSource.getAuthenticationToken(baseUrl)

        if (baseUrl.isEmpty() || authToken.isEmpty()) {
            stream.close()
            emit(Result.Failure(fileUri))
            return@flow
        }

        var connection: HttpURLConnection? = null
        val bufferedInputStream = stream.buffered()
        try {
            val fileSize = stream.available()
            if (fileSize == 0)
                throw IOException("Unable to read file $fileUri")

            connection = (URL(url).openConnection() as HttpURLConnection).also { conn ->
                addHeadersForUpload(conn, authToken, fileSize)
                conn.readTimeout = UPLOAD_READ_TIMEOUT
            }

            var progress = 0
            emit(Result.InProgress(fileUri, progress))
            BufferedOutputStream(connection.outputStream).use { uploadStream ->
                var bytesWritten = 0
                val buffer = ByteArray(DEFAULT_UPLOAD_BUFFER_SIZE)
                while (true) {
                    val size = bufferedInputStream.read(buffer)
                    if (size <= 0)
                        break

                    bytesWritten += size
                    progress = ((100f * bytesWritten) / fileSize).toInt()
                    emit(Result.InProgress(fileUri, progress))
                    uploadStream.write(buffer, 0, size)
                }
                uploadStream.flush()
            }
            val responseCode = connection.responseCode
            val isSuccess = responseCode == 200
            if (isSuccess) {
                if (progress < 100)
                    emit(Result.InProgress(fileUri, 100))
                emit(Result.Success(fileUri, downloadUrl, readResponseBody(connection)))
            } else {
                emit(Result.Failure(fileUri))
            }
        } catch (e: Exception) {
            emit(Result.Failure(fileUri))
        } finally {
            bufferedInputStream.close()
            connection?.disconnect()
        }
    }
}.flowOn(Dispatchers.IO)

private fun readResponseBody(connection: HttpURLConnection): String? {
    val builder = StringBuilder()
    val lineBreak = System.getProperty("line.separator")
    BufferedReader(connection.inputStream.reader()).use {
        val line = it.readLine() ?: return@use
        builder.append(line + lineBreak)
    }
    return builder.toString().trim(*lineBreak.toCharArray())
}