如何使用 RxJava 发送字节流?

How to send stream of bytes using RxJava?

有这样的代码可以完美的向服务器传输一个字符串并读取响应字符串。

val thread = Thread {
        try {
            val port = 8888
            println("Try to open connection:$port")
            val socket = Socket("192.168.0.104", port)
            println("Connection is created")
            val pw = PrintWriter(socket.getOutputStream(), true)
            val br = BufferedReader(InputStreamReader(socket.getInputStream()))
            pw.println("Hello from client 1")
            Log.e("Server answer", br.readLine())
            pw.close()
            br.close()
            socket.close()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    thread.start()

我正在尝试使用 RxJava 做同样的事情

fun connectToServer() {
    val TAG = "Server answer"
        connectToServerWrapper()
            .observeOn(Schedulers.io())
            .subscribeBy (onSuccess = {
                Log.e(TAG, it)
            }, onError = {
                Log.e(TAG, it.toString())
            })
            .addTo(compositeDisposable)
}

fun connectToServerWrapper(): Single<String> {
    return Single.create { emitter ->
        val port = 8888
        println("Try to open connection:$port")
        val socket = Socket(SERVER_IP, port)
        println("Connection is created")
        val pw = PrintWriter(socket.getOutputStream(), true)
        val br = BufferedReader(InputStreamReader(socket.getInputStream()))
        pw.println("Hello from client 1")
        emitter.onSuccess(br.readLine())
        pw.close()
        br.close()
        socket.close()
    }
}

Android.os.NetworkOnMainThreadException来自onError()。我做错了什么?

你不应该在主线程中执行你的网络请求

fun connectToServer() {
    val TAG = "Server answer"
        connectToServerWrapper()
            .subscribeOn(Schedulers.io())
            .subscribeBy (onSuccess = {
                Log.e(TAG, it)
            }, onError = {
                Log.e(TAG, it.toString())
            })
            .addTo(compositeDisposable)
}