如何在 Socket.IO 事件中使用 RxJs
How to use RxJs with Socket.IO on event
我想在 socket.on('sense',function(data){});
中使用 RxJS。我对可用的文档很少而且对 RxJS 缺乏了解感到困惑和困惑。这是我的问题。
我有一个 distSensor.js
有一个函数 pingEnd()
function pingEnd(x){
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated.
}
在我的App.js里面我有
io.on('connection', function (socket) {
socket.on('sense', function (data) {
//console.log('sense from App4 was called ' + data);
});
});
sense 函数获取大量传感器数据,我想使用 RxJS 过滤这些数据,但我不知道接下来我应该做什么才能在这里使用 RxJs。任何指向正确文档或示例的指针都会有所帮助。
您可以像这样创建一个 Observable:
var senses = Rx.Observable.fromEventPattern(
function add (h) {
socket.on('sense',h);
}
);
然后像使用任何其他 Observable 一样使用 senses
。
您可以使用 Rx.Observable.fromEvent
(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md)。
下面是我如何使用 Bacon.js 做类似的事情,它有一个非常相似的 API:https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13
所以在 Bacon.js 中会像
io.on('connection', function(socket){
Bacon.fromEvent(socket, "sense")
.filter(function(data) { return true })
.forEach(function(data) { dealWith(data) })
})
而在 RxJs 中,您需要将 Bacon.fromEvent
替换为 Rx.Observable.fromEvent
。
我在使用 fromEvent 方法时遇到了一些奇怪的问题,所以我更喜欢创建自己的 Observable:
function RxfromIO (io, eventName) {
return Rx.Observable.create(observer => {
io.on(eventName, (data) => {
observer.onNext(data)
});
return {
dispose : io.close
}
});
然后我可以这样使用:
let $connection = RxfromIO(io, 'connection');
你可以使用 rxjs-dom,
Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver])
// an observer for when the socket is open
var openObserver = Rx.Observer.create(function(e) {
console.info('socket open');
// Now it is safe to send a message
socket.onNext('test');
});
// an observer for when the socket is about to close
var closingObserver = Rx.Observer.create(function() {
console.log('socket is about to close');
});
// create a web socket subject
socket = Rx.DOM.fromWebSocket(
'ws://echo.websocket.org',
null, // no protocol
openObserver,
closingObserver);
// subscribing creates the underlying socket and will emit a stream of incoming
// message events
socket.subscribe(
function(e) {
console.log('message: %s', e.data);
},
function(e) {
// errors and "unclean" closes land here
console.error('error: %s', e);
},
function() {
// the socket has been closed
console.info('socket closed');
}
);
我使用的 ES6 一种衬垫,使用 ES7 bind 语法:
(读 $
为 stream
)
import { Observable } from 'rxjs'
// create socket
const message$ = Observable.create($ => socket.on('message', ::$.next))
// translates to: Observable.create($ => socket.on('message', $.next.bind(this)))
// filter example
const subscription = message$
.filter(message => message.text !== 'spam')
//or .filter(({ text }) => text !== 'spam')
.subscribe(::console.log)
只需使用fromEvent()
。这是 Node.js 中的完整示例,但在浏览器中的工作原理相同。请注意,我使用 first()
和 takeUntil()
来防止内存泄漏:first()
仅侦听一个事件然后完成。现在对您收听的所有其他套接字事件使用 takeUntil()
,以便在断开连接时完成观察:
const app = require('express')();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
const Rx = require('rxjs/Rx');
connection$ = Rx.Observable.fromEvent(io, 'connection');
connection$.subscribe(socket => {
console.log(`Client connected`);
// Observables
const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);
// Subscriptions
message$.subscribe(data => {
console.log(`Got message from client with data: ${data}`);
io.emit('message', data); // Emit to all clients
});
disconnect$.subscribe(() => {
console.log(`Client disconnected`);
})
});
server.listen(3000);
我想在 socket.on('sense',function(data){});
中使用 RxJS。我对可用的文档很少而且对 RxJS 缺乏了解感到困惑和困惑。这是我的问题。
我有一个 distSensor.js
有一个函数 pingEnd()
function pingEnd(x){
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated.
}
在我的App.js里面我有
io.on('connection', function (socket) {
socket.on('sense', function (data) {
//console.log('sense from App4 was called ' + data);
});
});
sense 函数获取大量传感器数据,我想使用 RxJS 过滤这些数据,但我不知道接下来我应该做什么才能在这里使用 RxJs。任何指向正确文档或示例的指针都会有所帮助。
您可以像这样创建一个 Observable:
var senses = Rx.Observable.fromEventPattern(
function add (h) {
socket.on('sense',h);
}
);
然后像使用任何其他 Observable 一样使用 senses
。
您可以使用 Rx.Observable.fromEvent
(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md)。
下面是我如何使用 Bacon.js 做类似的事情,它有一个非常相似的 API:https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13
所以在 Bacon.js 中会像
io.on('connection', function(socket){
Bacon.fromEvent(socket, "sense")
.filter(function(data) { return true })
.forEach(function(data) { dealWith(data) })
})
而在 RxJs 中,您需要将 Bacon.fromEvent
替换为 Rx.Observable.fromEvent
。
我在使用 fromEvent 方法时遇到了一些奇怪的问题,所以我更喜欢创建自己的 Observable:
function RxfromIO (io, eventName) {
return Rx.Observable.create(observer => {
io.on(eventName, (data) => {
observer.onNext(data)
});
return {
dispose : io.close
}
});
然后我可以这样使用:
let $connection = RxfromIO(io, 'connection');
你可以使用 rxjs-dom,
Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver])
// an observer for when the socket is open
var openObserver = Rx.Observer.create(function(e) {
console.info('socket open');
// Now it is safe to send a message
socket.onNext('test');
});
// an observer for when the socket is about to close
var closingObserver = Rx.Observer.create(function() {
console.log('socket is about to close');
});
// create a web socket subject
socket = Rx.DOM.fromWebSocket(
'ws://echo.websocket.org',
null, // no protocol
openObserver,
closingObserver);
// subscribing creates the underlying socket and will emit a stream of incoming
// message events
socket.subscribe(
function(e) {
console.log('message: %s', e.data);
},
function(e) {
// errors and "unclean" closes land here
console.error('error: %s', e);
},
function() {
// the socket has been closed
console.info('socket closed');
}
);
我使用的 ES6 一种衬垫,使用 ES7 bind 语法:
(读 $
为 stream
)
import { Observable } from 'rxjs'
// create socket
const message$ = Observable.create($ => socket.on('message', ::$.next))
// translates to: Observable.create($ => socket.on('message', $.next.bind(this)))
// filter example
const subscription = message$
.filter(message => message.text !== 'spam')
//or .filter(({ text }) => text !== 'spam')
.subscribe(::console.log)
只需使用fromEvent()
。这是 Node.js 中的完整示例,但在浏览器中的工作原理相同。请注意,我使用 first()
和 takeUntil()
来防止内存泄漏:first()
仅侦听一个事件然后完成。现在对您收听的所有其他套接字事件使用 takeUntil()
,以便在断开连接时完成观察:
const app = require('express')();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
const Rx = require('rxjs/Rx');
connection$ = Rx.Observable.fromEvent(io, 'connection');
connection$.subscribe(socket => {
console.log(`Client connected`);
// Observables
const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);
// Subscriptions
message$.subscribe(data => {
console.log(`Got message from client with data: ${data}`);
io.emit('message', data); // Emit to all clients
});
disconnect$.subscribe(() => {
console.log(`Client disconnected`);
})
});
server.listen(3000);