使用 mkfifo 在 linux 和 dotnet 之间进行进程间通信

inter-process communication between linux and dotnet using mkfifo

我们有一个用 c 编写的应用程序,它将 events/notifications 发送到一个用 c# 编写的应用程序。两个应用程序 运行 在同一台 linux 计算机上。

C应用程序:

C 应用程序是 Asterisk 我们修改了源代码(它是开源的)以便它可以将事件发送到我们的 dotnet 控制台应用程序。我们目前发送事件的方式很简单,就是将文本附加到文件中。例如,这就是我们发送新对等点 (ip-phone) 已连接的事件的方式:

// place this on chan_sip.c
// Example: 1-LN-48T6-E3C5-OFWT|10.0.0.103:5868|189.217.18.244|10216|Z 3.9.32144 r32121
if(!ast_sockaddr_isnull(&peer->addr))
{
    // lock 
    ast_mutex_lock(&some_lock);

        // write to file
        FILE *pFile;
        pFile=fopen("/var/log/asterisk/peer-subscriptions.txt", "a");
        if(pFile==NULL) { perror("Error opening file."); }
        else {
                fprintf(pFile,"%s|%s|%s|%s|%s\n",
                /* 1-LN-48T6-E3C5-OFWT */   peer->name,
                /* 10.0.0.103:5868     */   pvt->initviasentby,
                /* 189.217.18.244      */   ast_sockaddr_stringify_addr(&peer->addr),
                /* 10216               */   ast_strdupa(ast_sockaddr_stringify_port(&peer->addr)),
                /* Z 3.9.32144         */   peer->useragent
                    // Other:
                    // peer->fullcontact, // sip:1-LN-48T6-E3C5-OFWT@189.217.18.244:10216;rinstance=8b4135488f735cbf;transport=UDP
                    // pvt->via      //  SIP/2.0/UDP 54.81.92.135:20001;branch=z9hG4bK58525e18;rport
                    );
        }
        fclose(pFile);

    // unlock
    ast_mutex_lock(&some_lock);
 }

C# 应用程序 c# 应用程序是一个控制台应用程序,它打开该文件以读取事件,没什么特别的。

So basically the C application is writing to a text file and the c# application is reading from that text file.

问题

随着时间的推移,文件变得越来越大,我不想遇到麻烦 t运行cating 它并同时创建另一个锁 t运行cates 等等... 利用mkfifo似乎正是我想要的。由于我对 linux 比较陌生,所以我想确保在使用它之前了解它的工作原理。我了解 C 语言的基础知识(我不是专家)并且希望使用更高效的方法。你们推荐使用 mkfifo、namedpipes 还是 tcp?

示例 1:

mkfifo 在几行的情况下工作得很好,但是当我尝试阅读很多行时它失败了。举个例子:

mkfifo foo.pipe # create a file of type pipe

在终端上写入该文件

echo "hello world" >> foo.pipe   # writes hello world AND blocks until someone READS from it

我在一个单独的终端上做:

cat foo.pipe  # it will output hello world. This will block too until someone WRITES to that file

示例 2:

mkfifo foo.pipe # create a file of type pipe. If it exists already do not create again

在终端 1 上读取该文件

tail -f foo.pipe # similar to cat foo.pipe but it keeps reading

在终端 2 上写入该文件,但数据很多

echo ~/.bashrc >> foo.pipe  # write the content of file ~/.bashrc to that file

这不起作用,控制台上只显示该文件的几行。 如何正确使用 mkfifo 来阅读所有文本?我应该使用不同的方法并改用 tcp 吗?

我会使用 AF_UNIX 套接字连接。

我刚刚结束使用 tcp。我在10秒内发送10,000条短信没有问题。

C代码(客户端)

#include<stdio.h>
#include<string.h>  //strlen
#include<sys/socket.h>
#include<arpa/inet.h>   //inet_addr
#include<unistd.h>

int send_data(void)
{
    int socket_desc;
    struct sockaddr_in server;
    
    //Create socket
    socket_desc = socket(AF_INET , SOCK_STREAM , 0);
    if (socket_desc == -1)
    {
        printf("Could not create socket \n");
        return 1;
    }
    
    
        
    server.sin_addr.s_addr = inet_addr("127.0.0.1");
    server.sin_family = AF_INET;
    server.sin_port = htons( 11234 );

    //Connect to remote server
    if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
    {
        printf("connect error \n");
        close(socket_desc);
        return 2;
    }

    char *message;
    message = "hello world";
    if( send(socket_desc , message , strlen(message) , 0) < 0)
    {
        printf("Send failed \n");
        close(socket_desc);
        return 3;
    }

    close(socket_desc);
    return 0;
}

int main(int argc , char *argv[])
{
    // send 1000 messages
    for(int i=0; i<1000; i++)
    {
        send_data();
        // 10 milliseconds
        usleep(10000);
    }
    
    return 0;
}

C# 代码(服务器)

using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

public class Ipc_Tcp
{
    // Thread signal.  
    public static ManualResetEvent _semaphore = new ManualResetEvent(false);


    // maximum length of the pending connections queue.
    const int _max_length_pending_connections_queue = 50;

    const ushort _port = 11234;

    static int _counter = 0;


    public static void StartListening()
    {
        IPEndPoint localEndPoint = new IPEndPoint(System.Net.IPAddress.Loopback, _port);
        // Create a TCP/IP socket.  
        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        try
        {
            listener.Bind(localEndPoint);
            listener.Listen(_max_length_pending_connections_queue);

            Console.WriteLine("Waiting for a connection...");

            while (true)
            {
                // Set the event to nonsignaled state.  
                _semaphore.Reset();

                // Start an asynchronous socket to listen for connections.  
                
                listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);

                // Wait until a connection is made before continuing.  
                _semaphore.WaitOne();
            }

        }
        catch (Exception e)
        {
            Console.WriteLine("Something bad happened:");
            Console.WriteLine(e.ToString());
            Console.WriteLine("\nPress ENTER to continue...");
            Console.Read();
        }
    }

    // On new connection
    public static void AcceptCallback(IAsyncResult ar)
    {
        // Signal the main thread to continue.  
        _semaphore.Set();

        var cntr = Interlocked.Increment(ref _counter);

        // Get the socket that handles the client request.  
        Socket listener = (Socket)ar.AsyncState;
        Socket socket = listener.EndAccept(ar);

        var data = new byte[1024];
        var i = socket.Receive(data);

        // print message every 100 times
        if (cntr % 100 == 0)
            Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");

        // close socket we are only receiving events
        socket.Close();
    }




    public static int Main(String[] args)
    {
        StartListening();
        return 0;
    }
}

正如@resiliware 所述,最好使用 unix 套接字。

此示例展示了如何使用 unix 套接字在 C 和 C# 之间进行通信:

客户端(在 ubuntu 上用 C 运行 编写)

#include<stdio.h>
#include<string.h>  //strlen
#include<sys/socket.h>
#include<unistd.h>

int send_data(void)
{
    int sock;
    int conn;

    struct sockaddr saddr = {AF_UNIX, "/tmp/foo.sock"};
    socklen_t saddrlen = sizeof(struct sockaddr) + 6;

    sock = socket(AF_UNIX, SOCK_STREAM, 0);
    conn = connect(sock, &saddr, saddrlen);

    char BUFF[1024];

    char *message;
    message = "hello world";
    if( send(sock , message , strlen(message) , 0) < 0)
    {
        printf("Send failed \n");
        close(sock);
        return 3;
    }

    // I am not sure if I should close both or only the socket. 
    close(conn);
    close(sock);

    return 0;
}

int main(int argc , char *argv[])
{
    // send 5000 messages
    for(int i=0; i<4000; i++)
    {
        send_data();
        // sleep 1 millisecond
        usleep(1000);
    }
    
    return 0;
}

服务器(在同一台 ubuntu 机器上用 C# 运行 编写)

using System;
using System.Net.Sockets;
using System.Threading;

class Program
{
    // unix Endpoint that we will use
    const string path = "/tmp/foo.sock";

    // Thread signal.  
    public static ManualResetEvent _semaphore = new ManualResetEvent(false);

    // maximum length of the pending connections queue.
    const int _max_length_pending_connections_queue = 100;

    // Counts the number of messages received
    static int _counter = 0;

    public static void StartListening()
    {
        if (System.IO.File.Exists(path))
            System.IO.File.Delete(path);

        // create unix socket
        var listener = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);

        try
        {
            // listener.Bind(localEndPoint);
            listener.Bind(new UnixDomainSocketEndPoint(path));

            listener.Listen(_max_length_pending_connections_queue);

            Console.WriteLine("Waiting for a connection...");

            // keep listening for connections
            while (true)
            {
                // Set the event to nonsignaled state.  
                _semaphore.Reset();

                // Start an asynchronous socket to listen for connections.  

                listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);

                // Wait until a connection is made before continuing.  
                _semaphore.WaitOne();
            }

        }
        catch (Exception e)
        {
            Console.WriteLine("Something bad happened:");
            Console.WriteLine(e.ToString());
            Console.WriteLine("\nPress ENTER to continue...");
            Console.Read();
        }
    }

    // On new connection
    public static void AcceptCallback(IAsyncResult ar)
    {
        // Signal the main thread to continue.  
        _semaphore.Set();

        var cntr = Interlocked.Increment(ref _counter);

        // Get the socket that handles the client request.  
        Socket listener = (Socket)ar.AsyncState;
        Socket socket = listener.EndAccept(ar);

        var data = new byte[1024];
        var i = socket.Receive(data);

        // print message every 100 times
        //if (cntr % 100 == 0)
        Console.WriteLine($"[{cntr}] Received data: {System.Text.Encoding.UTF8.GetString(data, 0, i)}");

        // close socket we are only receiving events
        socket.Close();

    }

    static void Main(string[] args)
    {
        StartListening();        
    }
}

客户端(如果你希望客户端的代码用C#而不是C来写)

        using (var socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified))
        {
            socket.Connect(new UnixDomainSocketEndPoint(path));

            // send hello world
            var dataToSend = System.Text.Encoding.UTF8.GetBytes("Hello-world!");

            socket.Send(dataToSend);
        }