异步任务,视频缓冲

Asynchronous Task, video buffering

我正在尝试理解 C# 中的任务,但仍然遇到一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我正在使用 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回。首先,我是连续做的。因此,读取 Bitmap,从板上发送接收,然后绘图。但是读取位图并绘制它们需要花费太多时间。我想要一个保存视频帧的传输、接收 FIFO 缓冲区,以及一个不同的任务来完成发送接收每个帧的工作。所以我想并行进行。我想我应该创建 3 个任务:

        tasks.Add(Task.Run(() => Video_load(video_path)));
        tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
        tasks.Add(Task.Run(() => VideoDisp_hw(32)));

我想运行“并行”。我应该使用什么类型的对象?并发队列?缓冲块?或者只是一个列表?

多谢指教!我想请教一件事。我正在尝试创建一个带有 2 个 TPL 块的简单控制台程序。 1 个块将是 Transform 块(获取消息,即“开始”)并将数据加载到 List,另一个块将是 ActionBlock(只是从列表中读取数据并打印它们)。这是下面的代码:

namespace TPL_Dataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            Random randn = new Random();

            var loadData = new TransformBlock<string, List<int>>(async sample_string =>
           {
               List<int> input_data = new List<int>();
               int cnt = 0;

                if (sample_string == "start")
                {
                   Console.WriteLine("Inside loadData");
                   while (cnt < 16)
                   {
                       input_data.Add(randn.Next(1, 255));
                       await Task.Delay(1500);
                       Console.WriteLine("Cnt");
                       cnt++;
                   }
                                    }
                else
                {
                    Console.WriteLine("Not started yet");

                }
            return input_data;
           });


            var PrintData = new ActionBlock<List<int>>(async input_data =>
            {
                while(input_data.Count > 0)
                {


                    Console.WriteLine("output Data = " + input_data.First());
                    await Task.Delay(1000);
                    input_data.RemoveAt(0);
                    
                }
 

              });

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            loadData.LinkTo(PrintData, input_data => input_data.Count() >0  );
            //loadData.LinkTo(PrintData, linkOptions);
            
            loadData.SendAsync("start");
            loadData.Complete();
            PrintData.Completion.Wait();

        }
    }
}

但它似乎以串行方式工作..我做错了什么?我试着做 while 循环异步。我想同时做这两件事。当列表中的数据可用时,然后绘制。

您可以使用 TransformManyBlock<string, int> as the producer block, and an ActionBlock<int> as the consumer block. The TransformManyBlock would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>> delegate, and passed an iterator method(下例中的 Produce 方法)逐个生成值:

Random random = new Random();

var producer = new TransformManyBlock<string, int>(Produce);

IEnumerable<int> Produce(string message)
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            yield return value;
            Thread.Sleep(1500);
            cnt++;
        }
    }
    else
    {
        yield break;
    }
}

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

producer.LinkTo(consumer, new() { PropagateCompletion = true });

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

不幸的是,生产者必须在产生每个值 (Thread.Sleep(1500);) 之间的空闲期间阻塞工作线程,因为 TransformManyBlock 当前没有接受 [=19= 的构造函数].这可能会在 TPL Dataflow library. You could track this GitHub 问题的下一个版本中得到修复,以了解何时发布此功能。


备选解决方案: 除了显式链接生产者和消费者之外,您还可以让它们保持未链接状态,并手动将生产者产生的值发送给消费者。在这种情况下,两个块都是 ActionBlocks:

Random random = new Random();

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

var producer = new ActionBlock<string>(async message =>
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1, 255);
            Console.WriteLine($"Producing #{value}");
            var accepted = await consumer.SendAsync(value);
            if (!accepted) break; // The consumer has failed
            await Task.Delay(1500);
            cnt++;
        }
    }
});

PropagateCompletion(producer, consumer);

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
    try { await source.Completion.ConfigureAwait(false); } catch { }
    var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
    if (ex != null) target.Fault(ex); else target.Complete();
}

这种方法的主要困难是如何将生产者的完成传播给消费者,以便最终完成两个块。显然你不能使用 new DataflowLinkOptions { PropagateCompletion = true } 配置,因为块没有明确链接。您也不能 Complete 手动消费者,因为在这种情况下,它会停止过早地接受来自生产者的值。这个问题的解决方法就是上面例子中的PropagateCompletion方法