从 CouchDB 使用 EventArgs

Consume EventArgs from CouchDB

我正在努力使用从我的 CouchDB 数据库中获取的数据。

我正在尝试使用特定视图的新数据。 CouchDB 为 feed=continous 提供了一个选项,但我测试了它并且没有得到任何数据,在邮递员中也是如此。 但是如果我将它更改为 feed=eventsource 我可以在控制台中看到更改。但我不知道如何处理这些事件。 我打开了一个正确连接的方法,但我现在卡住了,任何帮助都会很棒。

public async Task ObserveDbAndTrigger()
    {
        var url = "http://localhost:5984/MyDB/_changes?feed=eventsource&filter=_view&view=MyView&include_docs=true&attachments=true&heartbeat=1000&since=0";

        using (var client = new HttpClient())
        {
            client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
            client.DefaultRequestHeaders.Add("Accept", "application/json");
            client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(System.Text.ASCIIEncoding.ASCII.GetBytes($"user:password" + $"")));

            var request = new HttpRequestMessage(HttpMethod.Get, url);

            // handle the incoming events and work with the incoming data
            
        }
    }

有什么建议吗?

显然还有工作要做。通常我会回避回答提出的此类问题,因为它看起来像是代码服务请求,但我相信这个答案可能会使 OP 以外的其他人受益。

这里是一段非常幼稚的代码,旨在说明事件委托以及通过 TCP 与 CouchDB 通信的简单性。

最终这证明了 publish/subscribe 模式,这是一个合理的匹配。我在 Windows 上对 CouchDB 2.3 进行了测试。代码硬连接到 localhost:5984 因为无论如何。

class NaiveChangeWatcher
{
    static void Main(string[] args)
    {
        if (args.Length >= 4)
        {
            // set up server info.
            string db = args[0];
            string auth = "Basic " + Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(String.Join(":", args[1], args[2])));
            string query = db + "/_changes?feed=continuous&since=0&heartbeat=" + args[3];
            // init the publisher
            ChangesPublisher pub = new ChangesPublisher();
            // let's subscribe to the OnChange event which writes event data to the console.
            pub.OnChange += (sender, e) => Console.WriteLine(e.Value);
            pub.OnException += (sender, e) => Console.WriteLine(e.Value.ToString() + "\r\n\r\nPress a key to exit.");
            //// start publishing.            
            Task.Run(async () =>
            {
                await pub.Begin("localhost", 5984, query, auth, int.Parse(args[3]));
            });
            // Press a key when bored of it all
            Console.ReadKey();
            // stop the publisher gracefully
            pub.Stop = true;
        }
        else
        {
            Console.WriteLine("usage: NaiveChangeWatcher db_name username password timeout_millis");
        }
    }
    //
    // The ChangesPublisher notifies subscribers of new data from the changes feed
    // via the ChangeEvent. The publisher will trigger an OnException event in the
    // event of an exception prior to ending its task.
    //
    public class ChangesPublisher
    {
        // Set to true to stop publishing. This causes the Begin method to complete.
        public bool Stop { get; set; }
        // The event posted when data from the server arrived
        public class ChangeEvent : EventArgs
        {
            public string Value { get; set; }
            public ChangeEvent(string value)
            {
                Value = value;
            }
        }
        // Event triggered when the subscriber croaks by exception
        public class ExceptionEvent : EventArgs
        {
            public Exception Value { get; set; }
            public ExceptionEvent(Exception value)
            {
                Value = value;
            }
        }
        // Subscription to changes from  the _changes endpoint
        public event EventHandler<ChangeEvent> OnChange = delegate { };
        // Subscription to publisher exit on error
        public event EventHandler<ExceptionEvent> OnException = delegate { };
        
        public async Task Begin(string serverAddr, int port, string query, string auth, int timeout)
        {
            using (var client = new TcpClient())
            {                  
                string request = String.Join("\r\n", new List<string> {
                    String.Format("GET /{0} HTTP/1.1",query),
                    "Authorization: " + auth,
                    "Accept: application/json",
                    "Host: " + serverAddr,
                    "Connection: keep-alive",
                    "\r\n"
                });
                 
                try
                {
                    await client.ConnectAsync(serverAddr, port);
                    using (NetworkStream stream = client.GetStream())
                    {                           
                        StreamWriter writer = new StreamWriter(stream);
                        await writer.WriteAsync(request);
                        await writer.FlushAsync();

                        // read lines from the server, ad nauseum.
                        StreamReader reader = new StreamReader(stream);
                        while (!Stop)
                        {
                            string data = await reader.ReadLineAsync();
                            // emit a change event
                            OnChange(this, new ChangeEvent(data));
                        }
                    }
                }
                catch (Exception e)
                {
                    OnException(this, new ExceptionEvent(e));                        
                }
            }
        }
    }
}