PUB/SUB Api 不回发消息,可能是什么问题?

PUB/SUB Api doesn't send messages back, what might be the problem?

我几乎完成了 pub/sub 假服务器,请求用户密码和电子邮件(来自客户端),将此信息与数据库和 returns 数据进行比较。它有 'api_in' 和 'api_out' 帧,然后是 JSON。 发布者毫不费力地获取并处理所有信息,但它似乎没有将任何内容发送回客户端(订阅者),我不知道为什么,因为它连接到后续端口。

而且我知道这个实现不是经典的 PUB/SUB 模式,但这是这样做的先决条件。

我尝试了不同的 pub/sub 选项,但没有任何改变。

服务器

let zmq = require('zeromq');
const sqlite3 = require('sqlite3').verbose();

const DBSOURCE = "./db.sqlite";

let db  = new sqlite3.Database(DBSOURCE, (err) => {
    if(err) {
        console.error(err.message);
        throw err;
    } else {
        console.log('Connected to SQLite database');
        db.run(`CREATE TABLE users (
        user_id INTEGER, 
        email TEXT,
        passw TEXT)`,
            (err) => {
                if (err) {
                    // Table already created
                } else {
                    // Creating rows
                    let insert = 'INSERT INTO users (user_id, email, passw) VALUES (?,?,?)';
                    db.run(insert, [123098, 'phillCollins@gmail.com','5502013']);
                    db.run(insert, [42424242,'dukenukem3d@mustdie.com','RodriguesShallLiveLong']);
                    db.run(insert, [5,'yourchick@yandex.ru','semolinaAndPain666']);

                }
            })
    }
});


const args = require('minimist')(process.argv.slice(2));

const pubSocket = zmq.socket('pub', null);

pubSocket.bindSync(`tcp://127.0.0.1:${args['pub']}`);

const subSocket = zmq.socket('sub', null);

subSocket.subscribe('api_in');

subSocket.on('message', function(data) {
    let message = data.toString().replace(/api_in/g, '');
    let mes = JSON.parse(message);

    let api_out = 'api_out';

    let errorWrongPWD = 'WRONG_PWD';
    let errorWrongFormat = 'WRONG_FORMAT';

    if(mes.type = 'login') {
        db.get(`SELECT user_id from users WHERE email = ? and passw = ?`, [mes.email, mes.pwd], function(err, row) {
            if(err) {
                console.log(err);
            } else {
                if(row) {
                    let msg = {
                        msg_id: mes.msg_id,
                        user_id: row.user_id,
                        status: 'ok'
                    }

                    let outMessage = api_out + JSON.stringify(msg);

                    console.log(outMessage);
                    subSocket.send(outMessage);
                } else {
                    let msg = {
                        msg_id: mes.msg_id,
                        status: 'error',
                        error: mes.email == '' || mes.pwd == '' ?  errorWrongFormat : errorWrongPWD
                    }
                    console.log(msg);

                    let outMessage = api_out + JSON.stringify(msg);

                    subSocket.send(outMessage);
                }
            }
        });
    }
});


subSocket.bindSync(`tcp://127.0.0.1:${args['sub']}`);

客户

let zmq = require('zeromq');
let uniqid = require('uniqid');

let readline = require('readline').createInterface({
    input: process.stdin,
    output: process.stdout
});

const args = require('minimist')(process.argv.slice(2));

const pubSocket = zmq.socket('pub', null);

let pubSocketTCP = `tcp://127.0.0.1:${args['sub']}`;

pubSocket.connect(pubSocketTCP);

const subSocket = zmq.socket('sub', null);

let subSocketTCP = `tcp://127.0.0.1:${args['pub']}`;

subSocket.connect(subSocketTCP);



let api_in = 'api_in';
let secondFrame = {
    type: 'login',
    email: '',
    pwd: '',
    msg_id: uniqid()
}

readline.question('What is your email? \n', (email) => {
    secondFrame.email = email;
    readline.question('What is your password \n', (pwd) => {
        secondFrame.pwd = pwd;
        let msg = api_in + JSON.stringify(secondFrame);
        console.log(msg);
        pubSocket.send(msg);
    });
});

subSocket.subscribe('api_out');

subSocket.on('message', (response) => {
/*     let res = response.toString().replace('api_out');
    let responseParsed = JSON.parse(res);
    console.log(responseParsed.status);
    if(response.status == 'error') console.log(response.error); */
    console.log(response);
}); 

我希望服务器端返回信息。

您的服务器正在尝试在子套接字上发送

subSocket.send(outMessage);

您不能在子套接字上发送。它应该在 pub socket 上发送。

嗯,首先,欢迎来到 Zen-of-Zero 域。 ZeroMQ 是一个强大的智能信号/消息传递工具,所以如果你注意它所有的内部优点,你将无法用它来做这些事情(在前进的道路上仍然无法避免噩梦)。如果对这个领域不熟悉,可以阅读 before diving into further details on subject, or re-use some of tricks posted here

Q : it doesn't seem to send anything back to the client (subscriber) and I don't know why

有两段代码,似乎同时使用了 PUBSUB Scalable Formal通信模式原型,但在如何配置这些原型方面存在一些问题。


服务器代码:

似乎尝试实例化 PUB-archetype 并使用 .bindSync()-方法和 cli 参数 args['pub'] 用于接受普通和普通的连接tcp://-transport-class.

在为第二个实例定义事件处理程序 .on( 'message', ... ) 之后,作为 SUB-原型,这个变成 .bindSync()-配备单个接入点,使用 tcp://-transport-class,使用 tcp://-transport-class.

接收连接

如果您确实需要在 SUB 类似的原型上制作 .send(),则必须使用 XSUB 替代方案,其中您可以在 PUB 端或 XPUB 端发送数据并使用实际有效负载执行一些技巧(有关 ZMQ_XPUB_MANUAL 模式功能和限制的详细信息,请参阅 API 文档对于 XPUB 端的一些更狂野的数据处理)

ZMQ_XSUB

Same as ZMQ_SUB except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix may also be sent, but have no effect on subscription status.


客户端代码:

似乎实例化和 .connect() 客户端本地 PUBSUB 原型通过 tcp://-transport-class 到服务器端AccessPoints(两者都应将 ZMQ_LINGER 设置为 0,以避免无限挂起孤儿(依赖于版本的默认值没有其他解决方案,但对此有明确设置))。


可能的改进:

  • XPUB/XSUBZMQ_XPUB_MANUAL 可以解决通过 SUB-archetype
  • 发送
  • XPUB/SUBZMQ_XPUB_MANUAL 可能会解决通过 SUB-原型发送的问题,而伪装要通过.subscribe()-方法
  • PUB/SUB 严格通过本地 PUB-archetype 实例制作所有 .send()-s。
  • 如果仍然丢失消息,请明确使用 ZMQ_SNDHWMZMQ_RCVHWM 参数
  • 仅在成功完成 { .bind() + .connect() } 方法后明确设置 .subscribe()(系统地使用原始 zmq_errno()zmq_strerror() 函数主动检测和修复潜在的碰撞州 )
  • 可以请求仅使用已完成的连接(分布式系统对自主分布式(许多)代理执行的操作顺序具有零保证)使用 .setsockopt( ZMQ_IMMEDIATE, 1 )