ZeroMQ重传pub-sub模型
ZeroMQ retransmitting pub-sub model
我正在尝试使用 ZeroMQ javascript 绑定来实现重新传输的 pub-sub backbone。
大意为:
- 入站模块向 backbone 宣布自己,backbone 订阅它们并在其自己的发布者套接字上重新传输
- 出站模块订阅backbone
我 运行 遇到了需要从单个线程使用的 pub 套接字的问题。
我当前的代码是这样的:
async function listen(name: string, sub: zmq.Subscriber, pub: zmq.Publisher) {
let id = 0;
for await (const [topic, msg] of sub) {
console.log(`BACKBONE | ${name} | received a message id: ${++id} related to: ${topic.toString()} containing message: ${msg.toString()}`);
await pub.send([topic, msg]);
}
}
为每个入站模块实例化,但当然它们会在 pub.send
上发生冲突。
我写了一个队列来序列化套接字访问,解决了这个问题。
import * as zmq from "zeromq"
export class PublisherQueue {
private queue: Buffer[][] = []
constructor(private publisher: zmq.Publisher) { }
private static toBuffer(value: Buffer | string): Buffer {
if (value instanceof Buffer) {
return value as Buffer;
} else {
return Buffer.from(value as string);
}
}
send(topic: Buffer | string, msg: Buffer | string) {
this.queue.push([PublisherQueue.toBuffer(topic), PublisherQueue.toBuffer(msg)]);
}
async run() {
while (true) {
if (this.queue.length > 0) {
let msg = this.queue.shift();
if (msg !== undefined) {
await this.publisher.send(msg);
}
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
}
我正在尝试使用 ZeroMQ javascript 绑定来实现重新传输的 pub-sub backbone。
大意为:
- 入站模块向 backbone 宣布自己,backbone 订阅它们并在其自己的发布者套接字上重新传输
- 出站模块订阅backbone
我 运行 遇到了需要从单个线程使用的 pub 套接字的问题。
我当前的代码是这样的:
async function listen(name: string, sub: zmq.Subscriber, pub: zmq.Publisher) {
let id = 0;
for await (const [topic, msg] of sub) {
console.log(`BACKBONE | ${name} | received a message id: ${++id} related to: ${topic.toString()} containing message: ${msg.toString()}`);
await pub.send([topic, msg]);
}
}
为每个入站模块实例化,但当然它们会在 pub.send
上发生冲突。
我写了一个队列来序列化套接字访问,解决了这个问题。
import * as zmq from "zeromq"
export class PublisherQueue {
private queue: Buffer[][] = []
constructor(private publisher: zmq.Publisher) { }
private static toBuffer(value: Buffer | string): Buffer {
if (value instanceof Buffer) {
return value as Buffer;
} else {
return Buffer.from(value as string);
}
}
send(topic: Buffer | string, msg: Buffer | string) {
this.queue.push([PublisherQueue.toBuffer(topic), PublisherQueue.toBuffer(msg)]);
}
async run() {
while (true) {
if (this.queue.length > 0) {
let msg = this.queue.shift();
if (msg !== undefined) {
await this.publisher.send(msg);
}
} else {
await new Promise(resolve => setTimeout(resolve, 50));
}
}
}
}