使用 mkfifo 在 linux 和 dotnet 之间进行进程间通信
inter-process communication between linux and dotnet using mkfifo
我们有一个用 c 编写的应用程序,它将 events/notifications 发送到一个用 c# 编写的应用程序。两个应用程序 运行 在同一台 linux 计算机上。
C应用程序:
C 应用程序是 Asterisk 我们修改了源代码(它是开源的)以便它可以将事件发送到我们的 dotnet 控制台应用程序。我们目前发送事件的方式很简单,就是将文本附加到文件中。例如,这就是我们发送新对等点 (ip-phone) 已连接的事件的方式:
// place this on chan_sip.c
// Example: 1-LN-48T6-E3C5-OFWT|10.0.0.103:5868|189.217.18.244|10216|Z 3.9.32144 r32121
if(!ast_sockaddr_isnull(&peer->addr))
{
// lock
ast_mutex_lock(&some_lock);
// write to file
FILE *pFile;
pFile=fopen("/var/log/asterisk/peer-subscriptions.txt", "a");
if(pFile==NULL) { perror("Error opening file."); }
else {
fprintf(pFile,"%s|%s|%s|%s|%s\n",
/* 1-LN-48T6-E3C5-OFWT */ peer->name,
/* 10.0.0.103:5868 */ pvt->initviasentby,
/* 189.217.18.244 */ ast_sockaddr_stringify_addr(&peer->addr),
/* 10216 */ ast_strdupa(ast_sockaddr_stringify_port(&peer->addr)),
/* Z 3.9.32144 */ peer->useragent
// Other:
// peer->fullcontact, // sip:1-LN-48T6-E3C5-OFWT@189.217.18.244:10216;rinstance=8b4135488f735cbf;transport=UDP
// pvt->via // SIP/2.0/UDP 54.81.92.135:20001;branch=z9hG4bK58525e18;rport
);
}
fclose(pFile);
// unlock
ast_mutex_lock(&some_lock);
}
C# 应用程序
c# 应用程序是一个控制台应用程序,它打开该文件以读取事件,没什么特别的。
So basically the C application is writing to a text file and the c# application is reading from that text file.
问题
随着时间的推移,文件变得越来越大,我不想遇到麻烦 t运行cating 它并同时创建另一个锁 t运行cates 等等... 利用mkfifo
似乎正是我想要的。由于我对 linux 比较陌生,所以我想确保在使用它之前了解它的工作原理。我了解 C 语言的基础知识(我不是专家)并且希望使用更高效的方法。你们推荐使用 mkfifo、namedpipes 还是 tcp?
示例 1:
mkfifo 在几行的情况下工作得很好,但是当我尝试阅读很多行时它失败了。举个例子:
mkfifo foo.pipe # create a file of type pipe
在终端上写入该文件
echo "hello world" >> foo.pipe # writes hello world AND blocks until someone READS from it
我在一个单独的终端上做:
cat foo.pipe # it will output hello world. This will block too until someone WRITES to that file
示例 2:
mkfifo foo.pipe # create a file of type pipe. If it exists already do not create again
在终端 1 上读取该文件
tail -f foo.pipe # similar to cat foo.pipe but it keeps reading
在终端 2 上写入该文件,但数据很多
echo ~/.bashrc >> foo.pipe # write the content of file ~/.bashrc to that file
这不起作用,控制台上只显示该文件的几行。 如何正确使用 mkfifo 来阅读所有文本?我应该使用不同的方法并改用 tcp 吗?
我会使用 AF_UNIX 套接字连接。
我刚刚结束使用 tcp。我在10秒内发送10,000条短信没有问题。
C代码(客户端)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<arpa/inet.h> //inet_addr
#include<unistd.h>
int send_data(void)
{
int socket_desc;
struct sockaddr_in server;
//Create socket
socket_desc = socket(AF_INET , SOCK_STREAM , 0);
if (socket_desc == -1)
{
printf("Could not create socket \n");
return 1;
}
server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_family = AF_INET;
server.sin_port = htons( 11234 );
//Connect to remote server
if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
{
printf("connect error \n");
close(socket_desc);
return 2;
}
char *message;
message = "hello world";
if( send(socket_desc , message , strlen(message) , 0) < 0)
{
printf("Send failed \n");
close(socket_desc);
return 3;
}
close(socket_desc);
return 0;
}
int main(int argc , char *argv[])
{
// send 1000 messages
for(int i=0; i<1000; i++)
{
send_data();
// 10 milliseconds
usleep(10000);
}
return 0;
}
C# 代码(服务器)
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Ipc_Tcp
{
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 50;
const ushort _port = 11234;
static int _counter = 0;
public static void StartListening()
{
IPEndPoint localEndPoint = new IPEndPoint(System.Net.IPAddress.Loopback, _port);
// Create a TCP/IP socket.
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(localEndPoint);
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("\nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
public static int Main(String[] args)
{
StartListening();
return 0;
}
}
正如@resiliware 所述,最好使用 unix 套接字。
此示例展示了如何使用 unix 套接字在 C 和 C# 之间进行通信:
客户端(在 ubuntu 上用 C 运行 编写)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<unistd.h>
int send_data(void)
{
int sock;
int conn;
struct sockaddr saddr = {AF_UNIX, "/tmp/foo.sock"};
socklen_t saddrlen = sizeof(struct sockaddr) + 6;
sock = socket(AF_UNIX, SOCK_STREAM, 0);
conn = connect(sock, &saddr, saddrlen);
char BUFF[1024];
char *message;
message = "hello world";
if( send(sock , message , strlen(message) , 0) < 0)
{
printf("Send failed \n");
close(sock);
return 3;
}
// I am not sure if I should close both or only the socket.
close(conn);
close(sock);
return 0;
}
int main(int argc , char *argv[])
{
// send 5000 messages
for(int i=0; i<4000; i++)
{
send_data();
// sleep 1 millisecond
usleep(1000);
}
return 0;
}
服务器(在同一台 ubuntu 机器上用 C# 运行 编写)
using System;
using System.Net.Sockets;
using System.Threading;
class Program
{
// unix Endpoint that we will use
const string path = "/tmp/foo.sock";
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 100;
// Counts the number of messages received
static int _counter = 0;
public static void StartListening()
{
if (System.IO.File.Exists(path))
System.IO.File.Delete(path);
// create unix socket
var listener = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
try
{
// listener.Bind(localEndPoint);
listener.Bind(new UnixDomainSocketEndPoint(path));
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
// keep listening for connections
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("\nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
//if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
static void Main(string[] args)
{
StartListening();
}
}
客户端(如果你希望客户端的代码用C#而不是C来写)
using (var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified))
{
socket.Connect(new UnixDomainSocketEndPoint(path));
// send hello world
var dataToSend = System.Text.Encoding.UTF8.GetBytes("Hello-world!");
socket.Send(dataToSend);
}
我们有一个用 c 编写的应用程序,它将 events/notifications 发送到一个用 c# 编写的应用程序。两个应用程序 运行 在同一台 linux 计算机上。
C应用程序:
C 应用程序是 Asterisk 我们修改了源代码(它是开源的)以便它可以将事件发送到我们的 dotnet 控制台应用程序。我们目前发送事件的方式很简单,就是将文本附加到文件中。例如,这就是我们发送新对等点 (ip-phone) 已连接的事件的方式:
// place this on chan_sip.c
// Example: 1-LN-48T6-E3C5-OFWT|10.0.0.103:5868|189.217.18.244|10216|Z 3.9.32144 r32121
if(!ast_sockaddr_isnull(&peer->addr))
{
// lock
ast_mutex_lock(&some_lock);
// write to file
FILE *pFile;
pFile=fopen("/var/log/asterisk/peer-subscriptions.txt", "a");
if(pFile==NULL) { perror("Error opening file."); }
else {
fprintf(pFile,"%s|%s|%s|%s|%s\n",
/* 1-LN-48T6-E3C5-OFWT */ peer->name,
/* 10.0.0.103:5868 */ pvt->initviasentby,
/* 189.217.18.244 */ ast_sockaddr_stringify_addr(&peer->addr),
/* 10216 */ ast_strdupa(ast_sockaddr_stringify_port(&peer->addr)),
/* Z 3.9.32144 */ peer->useragent
// Other:
// peer->fullcontact, // sip:1-LN-48T6-E3C5-OFWT@189.217.18.244:10216;rinstance=8b4135488f735cbf;transport=UDP
// pvt->via // SIP/2.0/UDP 54.81.92.135:20001;branch=z9hG4bK58525e18;rport
);
}
fclose(pFile);
// unlock
ast_mutex_lock(&some_lock);
}
C# 应用程序 c# 应用程序是一个控制台应用程序,它打开该文件以读取事件,没什么特别的。
So basically the C application is writing to a text file and the c# application is reading from that text file.
问题
随着时间的推移,文件变得越来越大,我不想遇到麻烦 t运行cating 它并同时创建另一个锁 t运行cates 等等... 利用mkfifo
似乎正是我想要的。由于我对 linux 比较陌生,所以我想确保在使用它之前了解它的工作原理。我了解 C 语言的基础知识(我不是专家)并且希望使用更高效的方法。你们推荐使用 mkfifo、namedpipes 还是 tcp?
示例 1:
mkfifo 在几行的情况下工作得很好,但是当我尝试阅读很多行时它失败了。举个例子:
mkfifo foo.pipe # create a file of type pipe
在终端上写入该文件
echo "hello world" >> foo.pipe # writes hello world AND blocks until someone READS from it
我在一个单独的终端上做:
cat foo.pipe # it will output hello world. This will block too until someone WRITES to that file
示例 2:
mkfifo foo.pipe # create a file of type pipe. If it exists already do not create again
在终端 1 上读取该文件
tail -f foo.pipe # similar to cat foo.pipe but it keeps reading
在终端 2 上写入该文件,但数据很多
echo ~/.bashrc >> foo.pipe # write the content of file ~/.bashrc to that file
这不起作用,控制台上只显示该文件的几行。 如何正确使用 mkfifo 来阅读所有文本?我应该使用不同的方法并改用 tcp 吗?
我会使用 AF_UNIX 套接字连接。
我刚刚结束使用 tcp。我在10秒内发送10,000条短信没有问题。
C代码(客户端)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<arpa/inet.h> //inet_addr
#include<unistd.h>
int send_data(void)
{
int socket_desc;
struct sockaddr_in server;
//Create socket
socket_desc = socket(AF_INET , SOCK_STREAM , 0);
if (socket_desc == -1)
{
printf("Could not create socket \n");
return 1;
}
server.sin_addr.s_addr = inet_addr("127.0.0.1");
server.sin_family = AF_INET;
server.sin_port = htons( 11234 );
//Connect to remote server
if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
{
printf("connect error \n");
close(socket_desc);
return 2;
}
char *message;
message = "hello world";
if( send(socket_desc , message , strlen(message) , 0) < 0)
{
printf("Send failed \n");
close(socket_desc);
return 3;
}
close(socket_desc);
return 0;
}
int main(int argc , char *argv[])
{
// send 1000 messages
for(int i=0; i<1000; i++)
{
send_data();
// 10 milliseconds
usleep(10000);
}
return 0;
}
C# 代码(服务器)
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
public class Ipc_Tcp
{
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 50;
const ushort _port = 11234;
static int _counter = 0;
public static void StartListening()
{
IPEndPoint localEndPoint = new IPEndPoint(System.Net.IPAddress.Loopback, _port);
// Create a TCP/IP socket.
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(localEndPoint);
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("\nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
public static int Main(String[] args)
{
StartListening();
return 0;
}
}
正如@resiliware 所述,最好使用 unix 套接字。
此示例展示了如何使用 unix 套接字在 C 和 C# 之间进行通信:
客户端(在 ubuntu 上用 C 运行 编写)
#include<stdio.h>
#include<string.h> //strlen
#include<sys/socket.h>
#include<unistd.h>
int send_data(void)
{
int sock;
int conn;
struct sockaddr saddr = {AF_UNIX, "/tmp/foo.sock"};
socklen_t saddrlen = sizeof(struct sockaddr) + 6;
sock = socket(AF_UNIX, SOCK_STREAM, 0);
conn = connect(sock, &saddr, saddrlen);
char BUFF[1024];
char *message;
message = "hello world";
if( send(sock , message , strlen(message) , 0) < 0)
{
printf("Send failed \n");
close(sock);
return 3;
}
// I am not sure if I should close both or only the socket.
close(conn);
close(sock);
return 0;
}
int main(int argc , char *argv[])
{
// send 5000 messages
for(int i=0; i<4000; i++)
{
send_data();
// sleep 1 millisecond
usleep(1000);
}
return 0;
}
服务器(在同一台 ubuntu 机器上用 C# 运行 编写)
using System;
using System.Net.Sockets;
using System.Threading;
class Program
{
// unix Endpoint that we will use
const string path = "/tmp/foo.sock";
// Thread signal.
public static ManualResetEvent _semaphore = new ManualResetEvent(false);
// maximum length of the pending connections queue.
const int _max_length_pending_connections_queue = 100;
// Counts the number of messages received
static int _counter = 0;
public static void StartListening()
{
if (System.IO.File.Exists(path))
System.IO.File.Delete(path);
// create unix socket
var listener = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
try
{
// listener.Bind(localEndPoint);
listener.Bind(new UnixDomainSocketEndPoint(path));
listener.Listen(_max_length_pending_connections_queue);
Console.WriteLine("Waiting for a connection...");
// keep listening for connections
while (true)
{
// Set the event to nonsignaled state.
_semaphore.Reset();
// Start an asynchronous socket to listen for connections.
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);
// Wait until a connection is made before continuing.
_semaphore.WaitOne();
}
}
catch (Exception e)
{
Console.WriteLine("Something bad happened:");
Console.WriteLine(e.ToString());
Console.WriteLine("\nPress ENTER to continue...");
Console.Read();
}
}
// On new connection
public static void AcceptCallback(IAsyncResult ar)
{
// Signal the main thread to continue.
_semaphore.Set();
var cntr = Interlocked.Increment(ref _counter);
// Get the socket that handles the client request.
Socket listener = (Socket)ar.AsyncState;
Socket socket = listener.EndAccept(ar);
var data = new byte[1024];
var i = socket.Receive(data);
// print message every 100 times
//if (cntr % 100 == 0)
Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");
// close socket we are only receiving events
socket.Close();
}
static void Main(string[] args)
{
StartListening();
}
}
客户端(如果你希望客户端的代码用C#而不是C来写)
using (var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified))
{
socket.Connect(new UnixDomainSocketEndPoint(path));
// send hello world
var dataToSend = System.Text.Encoding.UTF8.GetBytes("Hello-world!");
socket.Send(dataToSend);
}