Android 中 Socket.io 的 RxKotlin

RxKotlin with Socket.io in Android

我想做的是监听套接字数据并将其转换为我的 UI 可以订阅此事件并在 UI

上进行更改的可观察字符串

到目前为止,我创建了一个 class SocketConnection 保持在匕首连接中正确发生并接收数据并能够正确处理接口,但想与 rxkotlin 一起应用。

使用Socket.io,kotlin

套接字连接class

 class SocketConnection : SocketStreamListener {

    private var socket: Socket? = null

   var responseSocket :ResponseHandler?= null
    companion object {
        var instance = SocketConnection()

    }

    override fun createSocket(socketQuery: SocketQuery): Socket? {
        try {
            val okHttpClient = UnsafeOkHttpClient.getUnsafeOkHttpClient()
            IO.setDefaultOkHttpWebSocketFactory(okHttpClient)
            IO.setDefaultOkHttpCallFactory(okHttpClient)
            val opts = IO.Options()
            opts.reconnection = false
            opts.callFactory = okHttpClient
            opts.webSocketFactory = okHttpClient
            opts.query = "userID=" + socketQuery.userID + "&token=" + socketQuery.token
            socket = IO.socket(CommonContents.BASE_API_LAYER, opts)
            L.d("Socket object created")
        } catch (e: URISyntaxException) {
            L.e("Error creating socket", e)
        }
        return socket
    }

    override fun createSocketListener(socket: Socket) {
        L.d("inside the socket Listner")
        socket.connect()?.on(Socket.EVENT_CONNECT, {
            L.d("connected")
            listenSocketEvents()
            //socketDataListener()
            createMessageListener()

        })?.on(Socket.EVENT_DISCONNECT,
                {
                    L.d("disconnected")
                    return@on
                })

    }


    /**
     * function used to listen a socket chanel data
     */
    private fun listenSocketEvents() {


       /* socket?.on("1502", { args ->
      // This Will Work 
             L.d("Socket market depth event successfully")
            val socketData = args[0] as String
            L.d(socketData)
         //   instance.data = Observable.just(socketData)
            //data!!.doOnNext({ socketData })

            *//*
            data = args[0] as String
             for (i in 0 until arr.size) {
                 arr[i].socketStreamingData(data)
             }*//*

        })*/

    }

// This Will Not Work
    fun socketDataListener(): Observable<String>{
      return Observable.create({
          subscibe ->
         // L.d("Socket market depth event successfully")
            socket?.on("1502", { args ->
                L.d("Socket market depth event successfully")
                val socketData = args[0] as String
                subscibe.onNext(socketData)
            })

        })
    }

  }

存储库

fun getSocketData(): Observable<String> {
   // L.e("" + SocketConnection.instance.socketDataListener())
    return SocketConnection.instance.createMessageListener()
}

视图模型

fun getSocketData(): Observable<String>{
    return groupRepository.getSocketData()
}

OnFragement (UI)

private fun getSocketUpdate(){
    subscribe(watchlistViewModel.getSocketData()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                L.d("SocketData :  " + it.count())

            }, {
                L.e("Error")
            }))
}

在此UI中使用一次性订阅方法进入基class。

请提前告诉我我做错了什么

与其在每次发送消息时都创建一个 Observable,我建议为此使用一个 Subject,因为它与 Socket 连接具有相似的 "nature"。

   val subject = PublishSubject.create<String>()
   ...

    fun listenSocketEvents() {
      socket?.on("1502") { args ->
        val socketData = args[0] as String
        subject.onNext(socketData)
      }
    }

    fun observable(): Observable<String>{
        return subject
    }

然后您可以通过(不包括存储库层等,您必须自己做)

收听主题的变化
private fun getSocketUpdate() {
    disposable = socketConnection.observable()
            .subscribeOn(Schedulers.io())
            .observeOn(...)
            .subscribe({...}, {...})
}

附带说明一下,您的单例实例不是您在 kotlin 中的做法。 您应该将 class 声明为 object SocketConnection.
,而不是在 companion object 中包含 instance 字段 这将自动为您提供所有单例功能。 (我不知道将单例与 socket.io 一起使用是否明智,但我假设您知道自己在做什么:-))