Android 中 Socket.io 的 RxKotlin
RxKotlin with Socket.io in Android
我想做的是监听套接字数据并将其转换为我的 UI 可以订阅此事件并在 UI
上进行更改的可观察字符串
到目前为止,我创建了一个 class SocketConnection 保持在匕首连接中正确发生并接收数据并能够正确处理接口,但想与 rxkotlin 一起应用。
使用Socket.io,kotlin
套接字连接class
class SocketConnection : SocketStreamListener {
private var socket: Socket? = null
var responseSocket :ResponseHandler?= null
companion object {
var instance = SocketConnection()
}
override fun createSocket(socketQuery: SocketQuery): Socket? {
try {
val okHttpClient = UnsafeOkHttpClient.getUnsafeOkHttpClient()
IO.setDefaultOkHttpWebSocketFactory(okHttpClient)
IO.setDefaultOkHttpCallFactory(okHttpClient)
val opts = IO.Options()
opts.reconnection = false
opts.callFactory = okHttpClient
opts.webSocketFactory = okHttpClient
opts.query = "userID=" + socketQuery.userID + "&token=" + socketQuery.token
socket = IO.socket(CommonContents.BASE_API_LAYER, opts)
L.d("Socket object created")
} catch (e: URISyntaxException) {
L.e("Error creating socket", e)
}
return socket
}
override fun createSocketListener(socket: Socket) {
L.d("inside the socket Listner")
socket.connect()?.on(Socket.EVENT_CONNECT, {
L.d("connected")
listenSocketEvents()
//socketDataListener()
createMessageListener()
})?.on(Socket.EVENT_DISCONNECT,
{
L.d("disconnected")
return@on
})
}
/**
* function used to listen a socket chanel data
*/
private fun listenSocketEvents() {
/* socket?.on("1502", { args ->
// This Will Work
L.d("Socket market depth event successfully")
val socketData = args[0] as String
L.d(socketData)
// instance.data = Observable.just(socketData)
//data!!.doOnNext({ socketData })
*//*
data = args[0] as String
for (i in 0 until arr.size) {
arr[i].socketStreamingData(data)
}*//*
})*/
}
// This Will Not Work
fun socketDataListener(): Observable<String>{
return Observable.create({
subscibe ->
// L.d("Socket market depth event successfully")
socket?.on("1502", { args ->
L.d("Socket market depth event successfully")
val socketData = args[0] as String
subscibe.onNext(socketData)
})
})
}
}
存储库
fun getSocketData(): Observable<String> {
// L.e("" + SocketConnection.instance.socketDataListener())
return SocketConnection.instance.createMessageListener()
}
视图模型
fun getSocketData(): Observable<String>{
return groupRepository.getSocketData()
}
OnFragement (UI)
private fun getSocketUpdate(){
subscribe(watchlistViewModel.getSocketData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
L.d("SocketData : " + it.count())
}, {
L.e("Error")
}))
}
在此UI中使用一次性订阅方法进入基class。
请提前告诉我我做错了什么
与其在每次发送消息时都创建一个 Observable
,我建议为此使用一个 Subject
,因为它与 Socket 连接具有相似的 "nature"。
val subject = PublishSubject.create<String>()
...
fun listenSocketEvents() {
socket?.on("1502") { args ->
val socketData = args[0] as String
subject.onNext(socketData)
}
}
fun observable(): Observable<String>{
return subject
}
然后您可以通过(不包括存储库层等,您必须自己做)
收听主题的变化
private fun getSocketUpdate() {
disposable = socketConnection.observable()
.subscribeOn(Schedulers.io())
.observeOn(...)
.subscribe({...}, {...})
}
附带说明一下,您的单例实例不是您在 kotlin 中的做法。
您应该将 class 声明为 object SocketConnection
.
,而不是在 companion object
中包含 instance
字段
这将自动为您提供所有单例功能。 (我不知道将单例与 socket.io 一起使用是否明智,但我假设您知道自己在做什么:-))
我想做的是监听套接字数据并将其转换为我的 UI 可以订阅此事件并在 UI
上进行更改的可观察字符串到目前为止,我创建了一个 class SocketConnection 保持在匕首连接中正确发生并接收数据并能够正确处理接口,但想与 rxkotlin 一起应用。
使用Socket.io,kotlin
套接字连接class
class SocketConnection : SocketStreamListener {
private var socket: Socket? = null
var responseSocket :ResponseHandler?= null
companion object {
var instance = SocketConnection()
}
override fun createSocket(socketQuery: SocketQuery): Socket? {
try {
val okHttpClient = UnsafeOkHttpClient.getUnsafeOkHttpClient()
IO.setDefaultOkHttpWebSocketFactory(okHttpClient)
IO.setDefaultOkHttpCallFactory(okHttpClient)
val opts = IO.Options()
opts.reconnection = false
opts.callFactory = okHttpClient
opts.webSocketFactory = okHttpClient
opts.query = "userID=" + socketQuery.userID + "&token=" + socketQuery.token
socket = IO.socket(CommonContents.BASE_API_LAYER, opts)
L.d("Socket object created")
} catch (e: URISyntaxException) {
L.e("Error creating socket", e)
}
return socket
}
override fun createSocketListener(socket: Socket) {
L.d("inside the socket Listner")
socket.connect()?.on(Socket.EVENT_CONNECT, {
L.d("connected")
listenSocketEvents()
//socketDataListener()
createMessageListener()
})?.on(Socket.EVENT_DISCONNECT,
{
L.d("disconnected")
return@on
})
}
/**
* function used to listen a socket chanel data
*/
private fun listenSocketEvents() {
/* socket?.on("1502", { args ->
// This Will Work
L.d("Socket market depth event successfully")
val socketData = args[0] as String
L.d(socketData)
// instance.data = Observable.just(socketData)
//data!!.doOnNext({ socketData })
*//*
data = args[0] as String
for (i in 0 until arr.size) {
arr[i].socketStreamingData(data)
}*//*
})*/
}
// This Will Not Work
fun socketDataListener(): Observable<String>{
return Observable.create({
subscibe ->
// L.d("Socket market depth event successfully")
socket?.on("1502", { args ->
L.d("Socket market depth event successfully")
val socketData = args[0] as String
subscibe.onNext(socketData)
})
})
}
}
存储库
fun getSocketData(): Observable<String> {
// L.e("" + SocketConnection.instance.socketDataListener())
return SocketConnection.instance.createMessageListener()
}
视图模型
fun getSocketData(): Observable<String>{
return groupRepository.getSocketData()
}
OnFragement (UI)
private fun getSocketUpdate(){
subscribe(watchlistViewModel.getSocketData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
L.d("SocketData : " + it.count())
}, {
L.e("Error")
}))
}
在此UI中使用一次性订阅方法进入基class。
请提前告诉我我做错了什么
与其在每次发送消息时都创建一个 Observable
,我建议为此使用一个 Subject
,因为它与 Socket 连接具有相似的 "nature"。
val subject = PublishSubject.create<String>()
...
fun listenSocketEvents() {
socket?.on("1502") { args ->
val socketData = args[0] as String
subject.onNext(socketData)
}
}
fun observable(): Observable<String>{
return subject
}
然后您可以通过(不包括存储库层等,您必须自己做)
收听主题的变化private fun getSocketUpdate() {
disposable = socketConnection.observable()
.subscribeOn(Schedulers.io())
.observeOn(...)
.subscribe({...}, {...})
}
附带说明一下,您的单例实例不是您在 kotlin 中的做法。
您应该将 class 声明为 object SocketConnection
.
,而不是在 companion object
中包含 instance
字段
这将自动为您提供所有单例功能。 (我不知道将单例与 socket.io 一起使用是否明智,但我假设您知道自己在做什么:-))