用响应式编程重写事件发射器(信号量示例)
Rewrite event emitter with Reactive programming(Semaphore example)
我使用事件发射器作为同步原语。例如,我有一个 class 询问 Redis 中类似信号量的结构。如果设置了信号量,它会发出一个事件。代码如下:
var redis = require("redis"),
async = require('async'),
client = redis.createClient(),
assert = require('assert'),
EventEmitter = require('events').EventEmitter;
util = require('util');
var isMaster = false,
SEMAPHORE_ADDRESS = 'semaphore';
var SemaphoreAsker = function() {
var self = this;
var lifeCycle = function (next) {
client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) {
console.log('client');
if(err !== null) { throw err; }
if(val === 'OK') {
self.emit('crown');
} else {
console.log('still a minion');
}
});
};
async.forever(
function(next) {
setTimeout(
lifeCycle.bind(null, next),
1000
);
}
);
};
util.inherits(SemaphoreAsker, EventEmitter);
(new SemaphoreAsker()).on('crown', function() {
console.log('I`m master');
});
可以用,但看起来有点重。是否可以用 BaconJS(RxJS/whateverRPlibrary)?
重写示例
我使用基本的 Bacon.fromBinder
为此创建了自定义流。没有工作示例,这有点猜测,但希望这对您有所帮助。
var redis = require("redis"),
client = redis.createClient(),
assert = require('assert'),
Bacon = require('bacon');
var SEMAPHORE_ADDRESS = 'semaphore';
var SemaphoreAsker = function() {
return Bacon.fromBinder(function (sink) {
var intervalId = setInterval(pollRedis, 1000)
return function unsubscribe() {
clearInterval(intervalId)
}
function pollRedis() {
client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) {
if(err !== null) { sink(new Bacon.Error(err)) }
else if(val === 'OK') { sink(new Bacon.Next('crown'))
else { assert.fail(); }
}
}
})
}
SemaphoreAsker().take(1).onValue(function() {
console.log("I am master")
})
以下应该在 RXJS 中工作:
var callback = Rx.Observable.fromNodeCallback(client.set, client);
var source = Rx.Observable.interval(1000)
.selectMany(function() {
return callback([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5]);
})
.filter(function(x) { return x === 'OK'; })
.take(1);
source.subscribe(function(x) {
console.log("I am master");
});
如果您愿意另外包含 rx-node
模块,您还可以使用
保留事件发射器结构
var emitter = RxNode.toEventEmitter(source, 'crown');
emitter.on('crown', function(){});
emitter.on('error', function(){});
emitter.on('end', function(){});
@paulpdanies 的回答,但用培根重写了:
var source = Bacon.interval(1000).flatMap(function() {
return Bacon.fromNodeCallback(
client, 'set', [SEMAPHORE_ADDRESS, true, 'NX', 'EX', 1]
);
})
.filter(function(x) { return x === 'OK'; })
.take(1);
source.onValue(function(x) {
console.log(x);
});
我使用事件发射器作为同步原语。例如,我有一个 class 询问 Redis 中类似信号量的结构。如果设置了信号量,它会发出一个事件。代码如下:
var redis = require("redis"),
async = require('async'),
client = redis.createClient(),
assert = require('assert'),
EventEmitter = require('events').EventEmitter;
util = require('util');
var isMaster = false,
SEMAPHORE_ADDRESS = 'semaphore';
var SemaphoreAsker = function() {
var self = this;
var lifeCycle = function (next) {
client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) {
console.log('client');
if(err !== null) { throw err; }
if(val === 'OK') {
self.emit('crown');
} else {
console.log('still a minion');
}
});
};
async.forever(
function(next) {
setTimeout(
lifeCycle.bind(null, next),
1000
);
}
);
};
util.inherits(SemaphoreAsker, EventEmitter);
(new SemaphoreAsker()).on('crown', function() {
console.log('I`m master');
});
可以用,但看起来有点重。是否可以用 BaconJS(RxJS/whateverRPlibrary)?
重写示例我使用基本的 Bacon.fromBinder
为此创建了自定义流。没有工作示例,这有点猜测,但希望这对您有所帮助。
var redis = require("redis"),
client = redis.createClient(),
assert = require('assert'),
Bacon = require('bacon');
var SEMAPHORE_ADDRESS = 'semaphore';
var SemaphoreAsker = function() {
return Bacon.fromBinder(function (sink) {
var intervalId = setInterval(pollRedis, 1000)
return function unsubscribe() {
clearInterval(intervalId)
}
function pollRedis() {
client.set([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5], function(err, val) {
if(err !== null) { sink(new Bacon.Error(err)) }
else if(val === 'OK') { sink(new Bacon.Next('crown'))
else { assert.fail(); }
}
}
})
}
SemaphoreAsker().take(1).onValue(function() {
console.log("I am master")
})
以下应该在 RXJS 中工作:
var callback = Rx.Observable.fromNodeCallback(client.set, client);
var source = Rx.Observable.interval(1000)
.selectMany(function() {
return callback([SEMAPHORE_ADDRESS, true, 'NX', 'EX', 5]);
})
.filter(function(x) { return x === 'OK'; })
.take(1);
source.subscribe(function(x) {
console.log("I am master");
});
如果您愿意另外包含 rx-node
模块,您还可以使用
var emitter = RxNode.toEventEmitter(source, 'crown');
emitter.on('crown', function(){});
emitter.on('error', function(){});
emitter.on('end', function(){});
@paulpdanies 的回答,但用培根重写了:
var source = Bacon.interval(1000).flatMap(function() {
return Bacon.fromNodeCallback(
client, 'set', [SEMAPHORE_ADDRESS, true, 'NX', 'EX', 1]
);
})
.filter(function(x) { return x === 'OK'; })
.take(1);
source.onValue(function(x) {
console.log(x);
});