如何从redis订阅者发送数据到快速路由
How to send data from redis subscriber to express route
我有一个 redis pubsub 客户端,其中发布者在一个文件中,订阅者在另一个文件中,工作正常
我有 2 个控制器,一个处理“/”路由的主控制器和一个处理“/data”路由的数据控制器
在我的 redis 订阅者中,我想更新我不断从发布者那里获得的变量的状态
如何在两个控制器发出请求时将此状态发送给它们
我在做
app.get('/', (req, res) => {
c = redis.createClient()
c.on("message", (channel, message) => {
// Send data here
})
})
这看起来不是个好主意,它正在为对“/”端点的每个请求创建一个新的 CLIENT
我希望能够做到
// home controller file
app.get('/', (req, res) => {
res.json(state)
})
// data controller file
app.get('/data', (req, res) => {
res.json(state)
})
如何实现这个状态
经过一番研究,我决定使用Node原生的events module to solve this. This example uses ioredis而不是node_redis,但是原理是一样的
首先,我实例化了三个redis 客户端。一个用于常规数据库工作,一个发布者和一个订阅者
/* redis.js */
const Redis = require('ioredis');
const redis = new Redis();
const publisher = new Redis();
const subscriber = new Redis();
// redis is the defaut export
// publisher and subscriber are "named" exports
const client = (module.exports = redis);
client.publisher = publisher;
client.subscriber = subscriber;
接下来我们在节点中创建一个 EventEmitter
,每当订阅者从 redis 中的通道接收到消息时,它都会发出一个事件。
/* emitter.js */
const EventEmitter = require('events');
const { subscriber } = require('./redis');
const eventEmitter = new EventEmitter();
subscriber.subscribe('my-channel', err => {
if (err) { return console.log('Unable to subscribe to my-event channel') };
console.log('Subscription to my-event channel successful');
});
subscriber.on('message', (channel, message) => {
eventEmitter.emit('my-event', message);
});
module.exports = eventEmitter;
这里有两条路线。第一个处理在 redis 中设置字段的 PUT 请求,然后使用已更新的哈希键将消息发布到通道。第二个路由处理一个保持打开状态的 GET 请求(例如作为 SSE 连接的 EventSource)。它侦听来自发射器的事件,然后从 redis
发送更新密钥的数据
/* route.js*/
const express = require('express');
const redis = require('./redis');
const { publisher } = require('./redis');
const { eventEmitter } = require('./emitter');
const router = express.Router();
router.put('/content', async (req, res) => {
const { key, field, content } = req.body;
try {
await redis.hset(key, field, content);
res.sendStatus(200);
return publisher.publish('my-channel', key);
} catch(err) {
res.status(500).send(err.message);
}
});
router.get('/content-stream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
});
res.write('\n');
const handleEvent = async key => {
try {
const query = await redis.hgetall(key);
res.write(`data: ${JSON.stringify(query)}\n\n`);
} catch(err) {
console.log('Something went wrong');
}
}
eventEmitter.addListener('my-event', handleEvent);
req.on('close', eventEmitter.removeListener('my-event', handleEvent));
module.exports = router;
这将有效地避免在每次连接时实例化新的 Redis 客户端。可能有更好的方法,但这对我有用。
我有一个 redis pubsub 客户端,其中发布者在一个文件中,订阅者在另一个文件中,工作正常
我有 2 个控制器,一个处理“/”路由的主控制器和一个处理“/data”路由的数据控制器
在我的 redis 订阅者中,我想更新我不断从发布者那里获得的变量的状态
如何在两个控制器发出请求时将此状态发送给它们
我在做
app.get('/', (req, res) => {
c = redis.createClient()
c.on("message", (channel, message) => {
// Send data here
})
})
这看起来不是个好主意,它正在为对“/”端点的每个请求创建一个新的 CLIENT
我希望能够做到
// home controller file
app.get('/', (req, res) => {
res.json(state)
})
// data controller file
app.get('/data', (req, res) => {
res.json(state)
})
如何实现这个状态
经过一番研究,我决定使用Node原生的events module to solve this. This example uses ioredis而不是node_redis,但是原理是一样的
首先,我实例化了三个redis 客户端。一个用于常规数据库工作,一个发布者和一个订阅者
/* redis.js */
const Redis = require('ioredis');
const redis = new Redis();
const publisher = new Redis();
const subscriber = new Redis();
// redis is the defaut export
// publisher and subscriber are "named" exports
const client = (module.exports = redis);
client.publisher = publisher;
client.subscriber = subscriber;
接下来我们在节点中创建一个 EventEmitter
,每当订阅者从 redis 中的通道接收到消息时,它都会发出一个事件。
/* emitter.js */
const EventEmitter = require('events');
const { subscriber } = require('./redis');
const eventEmitter = new EventEmitter();
subscriber.subscribe('my-channel', err => {
if (err) { return console.log('Unable to subscribe to my-event channel') };
console.log('Subscription to my-event channel successful');
});
subscriber.on('message', (channel, message) => {
eventEmitter.emit('my-event', message);
});
module.exports = eventEmitter;
这里有两条路线。第一个处理在 redis 中设置字段的 PUT 请求,然后使用已更新的哈希键将消息发布到通道。第二个路由处理一个保持打开状态的 GET 请求(例如作为 SSE 连接的 EventSource)。它侦听来自发射器的事件,然后从 redis
发送更新密钥的数据/* route.js*/
const express = require('express');
const redis = require('./redis');
const { publisher } = require('./redis');
const { eventEmitter } = require('./emitter');
const router = express.Router();
router.put('/content', async (req, res) => {
const { key, field, content } = req.body;
try {
await redis.hset(key, field, content);
res.sendStatus(200);
return publisher.publish('my-channel', key);
} catch(err) {
res.status(500).send(err.message);
}
});
router.get('/content-stream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive'
});
res.write('\n');
const handleEvent = async key => {
try {
const query = await redis.hgetall(key);
res.write(`data: ${JSON.stringify(query)}\n\n`);
} catch(err) {
console.log('Something went wrong');
}
}
eventEmitter.addListener('my-event', handleEvent);
req.on('close', eventEmitter.removeListener('my-event', handleEvent));
module.exports = router;
这将有效地避免在每次连接时实例化新的 Redis 客户端。可能有更好的方法,但这对我有用。