如何在 C 和 Python 边之间的 nanomsg 中设置 Pub/Sub?
How to setup a Pub/Sub in nanomsg between a C and Python sides?
我正在尝试学习 nanomsg 库。
我正在使用版本 C 和 Python 的代码示例。我正在尝试使用 Python 脚本订阅 C 服务,但没有任何反应。
这是我的两个代码:
Python 订阅者
from __future__ import print_function
from nanomsg import Socket, PAIR, PUB
s2 = Socket(PAIR)
while(True):
s2.connect('tcp://127.0.0.1:5555')
s2.send(b'hello nanomsg #1')
s2.send(b'hello nanomsg #2')
s2.close()
C代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <netinet/in.h> /* For htonl and ntohl */
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
/* The server runs forever. */
int server(const char *url)
{
int fd;
/* Create the socket. */
fd = nn_socket (AF_SP, NN_PUB);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
/* Bind to the URL. This will bind to the address and listen
synchronously; new clients will be accepted asynchronously
without further action from the calling program. */
if (nn_bind (fd, url) < 0) {
fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
/* Now we can just publish results. Note that there is no explicit
accept required. We just start writing the information. */
for (;;) {
uint8_t msg[2 * sizeof (uint32_t)];
uint32_t secs, subs;
int rc;
secs = (uint32_t) time (NULL);
subs = (uint32_t) nn_get_statistic (fd, NN_STAT_CURRENT_CONNECTIONS);
secs = htonl (secs);
subs = htonl (subs);
memcpy (msg, &secs, sizeof (secs));
memcpy (msg + sizeof (secs), &subs, sizeof (subs));
rc = nn_send (fd, msg, sizeof (msg), 0);
if (rc < 0) {
/* There are several legitimate reasons this can fail.
We note them for debugging purposes, but then ignore
otherwise. */
fprintf (stderr, "nn_send: %s (ignoring)\n",
nn_strerror (nn_errno ()));
}
sleep(10);
}
/* NOTREACHED */
nn_close (fd);
return (-1);
}
/* The client runs in a loop, displaying the content. */
int client (const char *url)
{
int fd;
int rc;
fd = nn_socket (AF_SP, NN_SUB);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
if (nn_connect (fd, url) < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
/* We want all messages, so just subscribe to the empty value. */
if (nn_setsockopt (fd, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
fprintf (stderr, "nn_setsockopt: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
for (;;) {
uint8_t msg[2 * sizeof (uint32_t)];
char hhmmss[9]; /* HH:MM:SS[=13=] */
uint32_t subs, secs;
time_t t;
rc = nn_recv (fd, msg, sizeof (msg), 0);
if (rc < 0) {
fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
break;
}
if (rc != sizeof (msg)) {
fprintf (stderr, "nn_recv: got %d bytes, wanted %d\n",
rc, (int)sizeof (msg));
break;
}
memcpy (&secs, msg, sizeof (secs));
memcpy (&subs, msg + sizeof (secs), sizeof (subs));
t = (time_t) ntohl(secs);
strftime (hhmmss, sizeof (hhmmss), "%T", localtime (&t));
printf ("%s <pid %u> There are %u clients connected.\n", hhmmss,
(unsigned) getpid(), (unsigned) ntohl(subs));
}
nn_close (fd);
return (-1);
}
int main (int argc, char **argv)
{
int rc;
if ((argc == 3) && (strcmp (argv[2], "-s") == 0)) {
rc = server (argv[1]);
} else if (argc == 2) {
rc = client (argv[1]);
} else {
fprintf (stderr, "Usage: %s <url> [-s]\n", argv[0]);
exit (EXIT_FAILURE);
}
exit (rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
我运行 C代码
./pubsub_demo tcp://127.0.0.1:5555 -s
感谢您的帮助
C 代码看起来不错。它来自 here.
C NN_PUB
服务器和 NN_SUB
客户端的更简单版本 exists。
提供的 Python 代码存在一些问题。
1) 在nanomsg中我们必须匹配行为-"protocols"。为了接收来自 C 服务器的 NN_PUB
广播,我们必须有一个匹配的 SUB
,而不是 PAIR
,Python边.
2) 连接到与 NN_PUB
套接字 nn_bind()
-s 相同的端点-transport-class://address:port
。不需要在循环中做。
3) 套接字必须设置 SUB_SUBSCRIBE
选项。
4) SUB
套接字是用来监听的,它不是为.send()
设计的。
未经测试的 Python 程序在原则上可能如下所示:
# import appropriate modules for the nanomsg socket
from nanomsg import Socket, PUB, SUB, SUB_SUBSCRIBE
# open Python's SUB socket matching the NN_PUB socket on the C side
s2 = Socket(SUB)
# s2 should be >= 0
# connect the socket to the same endpoint as NN_PUB server
ret1 = s2.connect('tcp://127.0.0.1:5555')
# ret1 should be 0
# subscribe to everything:
ret2 = s2.set_string_option(SUB, SUB_SUBSCRIBE, '')
# ret1 should be 0
# receive messages:
while(True):
message = s2.recv()
你也可以看看Python测试PUB/SUB example
希望对您有所帮助。
我正在尝试学习 nanomsg 库。
我正在使用版本 C 和 Python 的代码示例。我正在尝试使用 Python 脚本订阅 C 服务,但没有任何反应。
这是我的两个代码:
Python 订阅者
from __future__ import print_function
from nanomsg import Socket, PAIR, PUB
s2 = Socket(PAIR)
while(True):
s2.connect('tcp://127.0.0.1:5555')
s2.send(b'hello nanomsg #1')
s2.send(b'hello nanomsg #2')
s2.close()
C代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <netinet/in.h> /* For htonl and ntohl */
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
/* The server runs forever. */
int server(const char *url)
{
int fd;
/* Create the socket. */
fd = nn_socket (AF_SP, NN_PUB);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
/* Bind to the URL. This will bind to the address and listen
synchronously; new clients will be accepted asynchronously
without further action from the calling program. */
if (nn_bind (fd, url) < 0) {
fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
/* Now we can just publish results. Note that there is no explicit
accept required. We just start writing the information. */
for (;;) {
uint8_t msg[2 * sizeof (uint32_t)];
uint32_t secs, subs;
int rc;
secs = (uint32_t) time (NULL);
subs = (uint32_t) nn_get_statistic (fd, NN_STAT_CURRENT_CONNECTIONS);
secs = htonl (secs);
subs = htonl (subs);
memcpy (msg, &secs, sizeof (secs));
memcpy (msg + sizeof (secs), &subs, sizeof (subs));
rc = nn_send (fd, msg, sizeof (msg), 0);
if (rc < 0) {
/* There are several legitimate reasons this can fail.
We note them for debugging purposes, but then ignore
otherwise. */
fprintf (stderr, "nn_send: %s (ignoring)\n",
nn_strerror (nn_errno ()));
}
sleep(10);
}
/* NOTREACHED */
nn_close (fd);
return (-1);
}
/* The client runs in a loop, displaying the content. */
int client (const char *url)
{
int fd;
int rc;
fd = nn_socket (AF_SP, NN_SUB);
if (fd < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
return (-1);
}
if (nn_connect (fd, url) < 0) {
fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
/* We want all messages, so just subscribe to the empty value. */
if (nn_setsockopt (fd, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
fprintf (stderr, "nn_setsockopt: %s\n", nn_strerror (nn_errno ()));
nn_close (fd);
return (-1);
}
for (;;) {
uint8_t msg[2 * sizeof (uint32_t)];
char hhmmss[9]; /* HH:MM:SS[=13=] */
uint32_t subs, secs;
time_t t;
rc = nn_recv (fd, msg, sizeof (msg), 0);
if (rc < 0) {
fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
break;
}
if (rc != sizeof (msg)) {
fprintf (stderr, "nn_recv: got %d bytes, wanted %d\n",
rc, (int)sizeof (msg));
break;
}
memcpy (&secs, msg, sizeof (secs));
memcpy (&subs, msg + sizeof (secs), sizeof (subs));
t = (time_t) ntohl(secs);
strftime (hhmmss, sizeof (hhmmss), "%T", localtime (&t));
printf ("%s <pid %u> There are %u clients connected.\n", hhmmss,
(unsigned) getpid(), (unsigned) ntohl(subs));
}
nn_close (fd);
return (-1);
}
int main (int argc, char **argv)
{
int rc;
if ((argc == 3) && (strcmp (argv[2], "-s") == 0)) {
rc = server (argv[1]);
} else if (argc == 2) {
rc = client (argv[1]);
} else {
fprintf (stderr, "Usage: %s <url> [-s]\n", argv[0]);
exit (EXIT_FAILURE);
}
exit (rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}
我运行 C代码
./pubsub_demo tcp://127.0.0.1:5555 -s
感谢您的帮助
C 代码看起来不错。它来自 here.
C NN_PUB
服务器和 NN_SUB
客户端的更简单版本 exists。
提供的 Python 代码存在一些问题。
1) 在nanomsg中我们必须匹配行为-"protocols"。为了接收来自 C 服务器的 NN_PUB
广播,我们必须有一个匹配的 SUB
,而不是 PAIR
,Python边.
2) 连接到与 NN_PUB
套接字 nn_bind()
-s 相同的端点-transport-class://address:port
。不需要在循环中做。
3) 套接字必须设置 SUB_SUBSCRIBE
选项。
4) SUB
套接字是用来监听的,它不是为.send()
设计的。
未经测试的 Python 程序在原则上可能如下所示:
# import appropriate modules for the nanomsg socket
from nanomsg import Socket, PUB, SUB, SUB_SUBSCRIBE
# open Python's SUB socket matching the NN_PUB socket on the C side
s2 = Socket(SUB)
# s2 should be >= 0
# connect the socket to the same endpoint as NN_PUB server
ret1 = s2.connect('tcp://127.0.0.1:5555')
# ret1 should be 0
# subscribe to everything:
ret2 = s2.set_string_option(SUB, SUB_SUBSCRIBE, '')
# ret1 should be 0
# receive messages:
while(True):
message = s2.recv()
你也可以看看Python测试PUB/SUB example
希望对您有所帮助。