PUB/SUB Api 不回发消息,可能是什么问题?
PUB/SUB Api doesn't send messages back, what might be the problem?
我几乎完成了 pub/sub 假服务器,请求用户密码和电子邮件(来自客户端),将此信息与数据库和 returns 数据进行比较。它有 'api_in' 和 'api_out' 帧,然后是 JSON。
发布者毫不费力地获取并处理所有信息,但它似乎没有将任何内容发送回客户端(订阅者),我不知道为什么,因为它连接到后续端口。
而且我知道这个实现不是经典的 PUB/SUB 模式,但这是这样做的先决条件。
我尝试了不同的 pub/sub 选项,但没有任何改变。
服务器
let zmq = require('zeromq');
const sqlite3 = require('sqlite3').verbose();
const DBSOURCE = "./db.sqlite";
let db = new sqlite3.Database(DBSOURCE, (err) => {
if(err) {
console.error(err.message);
throw err;
} else {
console.log('Connected to SQLite database');
db.run(`CREATE TABLE users (
user_id INTEGER,
email TEXT,
passw TEXT)`,
(err) => {
if (err) {
// Table already created
} else {
// Creating rows
let insert = 'INSERT INTO users (user_id, email, passw) VALUES (?,?,?)';
db.run(insert, [123098, 'phillCollins@gmail.com','5502013']);
db.run(insert, [42424242,'dukenukem3d@mustdie.com','RodriguesShallLiveLong']);
db.run(insert, [5,'yourchick@yandex.ru','semolinaAndPain666']);
}
})
}
});
const args = require('minimist')(process.argv.slice(2));
const pubSocket = zmq.socket('pub', null);
pubSocket.bindSync(`tcp://127.0.0.1:${args['pub']}`);
const subSocket = zmq.socket('sub', null);
subSocket.subscribe('api_in');
subSocket.on('message', function(data) {
let message = data.toString().replace(/api_in/g, '');
let mes = JSON.parse(message);
let api_out = 'api_out';
let errorWrongPWD = 'WRONG_PWD';
let errorWrongFormat = 'WRONG_FORMAT';
if(mes.type = 'login') {
db.get(`SELECT user_id from users WHERE email = ? and passw = ?`, [mes.email, mes.pwd], function(err, row) {
if(err) {
console.log(err);
} else {
if(row) {
let msg = {
msg_id: mes.msg_id,
user_id: row.user_id,
status: 'ok'
}
let outMessage = api_out + JSON.stringify(msg);
console.log(outMessage);
subSocket.send(outMessage);
} else {
let msg = {
msg_id: mes.msg_id,
status: 'error',
error: mes.email == '' || mes.pwd == '' ? errorWrongFormat : errorWrongPWD
}
console.log(msg);
let outMessage = api_out + JSON.stringify(msg);
subSocket.send(outMessage);
}
}
});
}
});
subSocket.bindSync(`tcp://127.0.0.1:${args['sub']}`);
客户
let zmq = require('zeromq');
let uniqid = require('uniqid');
let readline = require('readline').createInterface({
input: process.stdin,
output: process.stdout
});
const args = require('minimist')(process.argv.slice(2));
const pubSocket = zmq.socket('pub', null);
let pubSocketTCP = `tcp://127.0.0.1:${args['sub']}`;
pubSocket.connect(pubSocketTCP);
const subSocket = zmq.socket('sub', null);
let subSocketTCP = `tcp://127.0.0.1:${args['pub']}`;
subSocket.connect(subSocketTCP);
let api_in = 'api_in';
let secondFrame = {
type: 'login',
email: '',
pwd: '',
msg_id: uniqid()
}
readline.question('What is your email? \n', (email) => {
secondFrame.email = email;
readline.question('What is your password \n', (pwd) => {
secondFrame.pwd = pwd;
let msg = api_in + JSON.stringify(secondFrame);
console.log(msg);
pubSocket.send(msg);
});
});
subSocket.subscribe('api_out');
subSocket.on('message', (response) => {
/* let res = response.toString().replace('api_out');
let responseParsed = JSON.parse(res);
console.log(responseParsed.status);
if(response.status == 'error') console.log(response.error); */
console.log(response);
});
我希望服务器端返回信息。
您的服务器正在尝试在子套接字上发送
subSocket.send(outMessage);
您不能在子套接字上发送。它应该在 pub socket 上发送。
嗯,首先,欢迎来到 Zen-of-Zero 域。 ZeroMQ 是一个强大的智能信号/消息传递工具,所以如果你注意它所有的内部优点,你将无法用它来做这些事情(在前进的道路上仍然无法避免噩梦)。如果对这个领域不熟悉,可以阅读 before diving into further details on subject, or re-use some of tricks posted here
Q : it doesn't seem to send anything back to the client (subscriber) and I don't know why
有两段代码,似乎同时使用了 PUB
和 SUB
Scalable Formal通信模式原型,但在如何配置这些原型方面存在一些问题。
服务器代码:
似乎尝试实例化 PUB
-archetype 并使用 .bindSync()
-方法和 cli 参数 args['pub']
用于接受普通和普通的连接tcp://
-transport-class.
在为第二个实例定义事件处理程序 .on( 'message', ... )
之后,作为 SUB
-原型,这个变成 .bindSync()
-配备单个接入点,使用 tcp://
-transport-class,使用 tcp://
-transport-class.
接收连接
如果您确实需要在 SUB
类似的原型上制作 .send()
,则必须使用 XSUB
替代方案,其中您可以在 PUB
端或 XPUB
端发送数据并使用实际有效负载执行一些技巧(有关 ZMQ_XPUB_MANUAL
模式功能和限制的详细信息,请参阅 API 文档对于 XPUB
端的一些更狂野的数据处理)
ZMQ_XSUB
Same as ZMQ_SUB
except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix may also be sent, but have no effect on subscription status.
客户端代码:
似乎实例化和 .connect()
客户端本地 PUB
和 SUB
原型通过 tcp://
-transport-class 到服务器端AccessPoints(两者都应将 ZMQ_LINGER 设置为 0,以避免无限挂起孤儿(依赖于版本的默认值没有其他解决方案,但对此有明确设置))。
可能的改进:
XPUB/XSUB
和 ZMQ_XPUB_MANUAL
可以解决通过 SUB
-archetype 发送
XPUB/SUB
和 ZMQ_XPUB_MANUAL
可能会解决通过 SUB
-原型发送的问题,而伪装要通过.subscribe()
-方法
PUB/SUB
严格通过本地 PUB
-archetype 实例制作所有 .send()
-s。
- 如果仍然丢失消息,请明确使用
ZMQ_SNDHWM
和 ZMQ_RCVHWM
参数
- 仅在成功完成
{ .bind() + .connect() }
方法后明确设置 .subscribe()
(系统地使用原始 zmq_errno()
和 zmq_strerror()
函数主动检测和修复潜在的碰撞州 )
- 可以请求仅使用已完成的连接(分布式系统对自主分布式(许多)代理执行的操作顺序具有零保证)使用
.setsockopt( ZMQ_IMMEDIATE, 1 )
我几乎完成了 pub/sub 假服务器,请求用户密码和电子邮件(来自客户端),将此信息与数据库和 returns 数据进行比较。它有 'api_in' 和 'api_out' 帧,然后是 JSON。 发布者毫不费力地获取并处理所有信息,但它似乎没有将任何内容发送回客户端(订阅者),我不知道为什么,因为它连接到后续端口。
而且我知道这个实现不是经典的 PUB/SUB 模式,但这是这样做的先决条件。
我尝试了不同的 pub/sub 选项,但没有任何改变。
服务器
let zmq = require('zeromq');
const sqlite3 = require('sqlite3').verbose();
const DBSOURCE = "./db.sqlite";
let db = new sqlite3.Database(DBSOURCE, (err) => {
if(err) {
console.error(err.message);
throw err;
} else {
console.log('Connected to SQLite database');
db.run(`CREATE TABLE users (
user_id INTEGER,
email TEXT,
passw TEXT)`,
(err) => {
if (err) {
// Table already created
} else {
// Creating rows
let insert = 'INSERT INTO users (user_id, email, passw) VALUES (?,?,?)';
db.run(insert, [123098, 'phillCollins@gmail.com','5502013']);
db.run(insert, [42424242,'dukenukem3d@mustdie.com','RodriguesShallLiveLong']);
db.run(insert, [5,'yourchick@yandex.ru','semolinaAndPain666']);
}
})
}
});
const args = require('minimist')(process.argv.slice(2));
const pubSocket = zmq.socket('pub', null);
pubSocket.bindSync(`tcp://127.0.0.1:${args['pub']}`);
const subSocket = zmq.socket('sub', null);
subSocket.subscribe('api_in');
subSocket.on('message', function(data) {
let message = data.toString().replace(/api_in/g, '');
let mes = JSON.parse(message);
let api_out = 'api_out';
let errorWrongPWD = 'WRONG_PWD';
let errorWrongFormat = 'WRONG_FORMAT';
if(mes.type = 'login') {
db.get(`SELECT user_id from users WHERE email = ? and passw = ?`, [mes.email, mes.pwd], function(err, row) {
if(err) {
console.log(err);
} else {
if(row) {
let msg = {
msg_id: mes.msg_id,
user_id: row.user_id,
status: 'ok'
}
let outMessage = api_out + JSON.stringify(msg);
console.log(outMessage);
subSocket.send(outMessage);
} else {
let msg = {
msg_id: mes.msg_id,
status: 'error',
error: mes.email == '' || mes.pwd == '' ? errorWrongFormat : errorWrongPWD
}
console.log(msg);
let outMessage = api_out + JSON.stringify(msg);
subSocket.send(outMessage);
}
}
});
}
});
subSocket.bindSync(`tcp://127.0.0.1:${args['sub']}`);
客户
let zmq = require('zeromq');
let uniqid = require('uniqid');
let readline = require('readline').createInterface({
input: process.stdin,
output: process.stdout
});
const args = require('minimist')(process.argv.slice(2));
const pubSocket = zmq.socket('pub', null);
let pubSocketTCP = `tcp://127.0.0.1:${args['sub']}`;
pubSocket.connect(pubSocketTCP);
const subSocket = zmq.socket('sub', null);
let subSocketTCP = `tcp://127.0.0.1:${args['pub']}`;
subSocket.connect(subSocketTCP);
let api_in = 'api_in';
let secondFrame = {
type: 'login',
email: '',
pwd: '',
msg_id: uniqid()
}
readline.question('What is your email? \n', (email) => {
secondFrame.email = email;
readline.question('What is your password \n', (pwd) => {
secondFrame.pwd = pwd;
let msg = api_in + JSON.stringify(secondFrame);
console.log(msg);
pubSocket.send(msg);
});
});
subSocket.subscribe('api_out');
subSocket.on('message', (response) => {
/* let res = response.toString().replace('api_out');
let responseParsed = JSON.parse(res);
console.log(responseParsed.status);
if(response.status == 'error') console.log(response.error); */
console.log(response);
});
我希望服务器端返回信息。
您的服务器正在尝试在子套接字上发送
subSocket.send(outMessage);
您不能在子套接字上发送。它应该在 pub socket 上发送。
嗯,首先,欢迎来到 Zen-of-Zero 域。 ZeroMQ 是一个强大的智能信号/消息传递工具,所以如果你注意它所有的内部优点,你将无法用它来做这些事情(在前进的道路上仍然无法避免噩梦)。如果对这个领域不熟悉,可以阅读
Q : it doesn't seem to send anything back to the client (subscriber) and I don't know why
有两段代码,似乎同时使用了 PUB
和 SUB
Scalable Formal通信模式原型,但在如何配置这些原型方面存在一些问题。
服务器代码:
似乎尝试实例化 PUB
-archetype 并使用 .bindSync()
-方法和 cli 参数 args['pub']
用于接受普通和普通的连接tcp://
-transport-class.
在为第二个实例定义事件处理程序 .on( 'message', ... )
之后,作为 SUB
-原型,这个变成 .bindSync()
-配备单个接入点,使用 tcp://
-transport-class,使用 tcp://
-transport-class.
如果您确实需要在 SUB
类似的原型上制作 .send()
,则必须使用 XSUB
替代方案,其中您可以在 PUB
端或 XPUB
端发送数据并使用实际有效负载执行一些技巧(有关 ZMQ_XPUB_MANUAL
模式功能和限制的详细信息,请参阅 API 文档对于 XPUB
端的一些更狂野的数据处理)
ZMQ_XSUB
Same asZMQ_SUB
except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix may also be sent, but have no effect on subscription status.
客户端代码:
似乎实例化和 .connect()
客户端本地 PUB
和 SUB
原型通过 tcp://
-transport-class 到服务器端AccessPoints(两者都应将 ZMQ_LINGER 设置为 0,以避免无限挂起孤儿(依赖于版本的默认值没有其他解决方案,但对此有明确设置))。
可能的改进:
XPUB/XSUB
和ZMQ_XPUB_MANUAL
可以解决通过SUB
-archetype 发送
XPUB/SUB
和ZMQ_XPUB_MANUAL
可能会解决通过SUB
-原型发送的问题,而伪装要通过.subscribe()
-方法PUB/SUB
严格通过本地PUB
-archetype 实例制作所有.send()
-s。- 如果仍然丢失消息,请明确使用
ZMQ_SNDHWM
和ZMQ_RCVHWM
参数 - 仅在成功完成
{ .bind() + .connect() }
方法后明确设置.subscribe()
(系统地使用原始zmq_errno()
和zmq_strerror()
函数主动检测和修复潜在的碰撞州 ) - 可以请求仅使用已完成的连接(分布式系统对自主分布式(许多)代理执行的操作顺序具有零保证)使用
.setsockopt( ZMQ_IMMEDIATE, 1 )