异步任务,视频缓冲
Asynchronous Task, video buffering
我正在尝试理解 C# 中的任务,但仍然遇到一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我正在使用 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回。首先,我是连续做的。因此,读取 Bitmap
,从板上发送接收,然后绘图。但是读取位图并绘制它们需要花费太多时间。我想要一个保存视频帧的传输、接收 FIFO 缓冲区,以及一个不同的任务来完成发送接收每个帧的工作。所以我想并行进行。我想我应该创建 3 个任务:
tasks.Add(Task.Run(() => Video_load(video_path)));
tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
tasks.Add(Task.Run(() => VideoDisp_hw(32)));
我想运行“并行”。我应该使用什么类型的对象?并发队列?缓冲块?或者只是一个列表?
多谢指教!我想请教一件事。我正在尝试创建一个带有 2 个 TPL 块的简单控制台程序。 1 个块将是 Transform 块(获取消息,即“开始”)并将数据加载到 List,另一个块将是 ActionBlock(只是从列表中读取数据并打印它们)。这是下面的代码:
namespace TPL_Dataflow
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Random randn = new Random();
var loadData = new TransformBlock<string, List<int>>(async sample_string =>
{
List<int> input_data = new List<int>();
int cnt = 0;
if (sample_string == "start")
{
Console.WriteLine("Inside loadData");
while (cnt < 16)
{
input_data.Add(randn.Next(1, 255));
await Task.Delay(1500);
Console.WriteLine("Cnt");
cnt++;
}
}
else
{
Console.WriteLine("Not started yet");
}
return input_data;
});
var PrintData = new ActionBlock<List<int>>(async input_data =>
{
while(input_data.Count > 0)
{
Console.WriteLine("output Data = " + input_data.First());
await Task.Delay(1000);
input_data.RemoveAt(0);
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
loadData.LinkTo(PrintData, input_data => input_data.Count() >0 );
//loadData.LinkTo(PrintData, linkOptions);
loadData.SendAsync("start");
loadData.Complete();
PrintData.Completion.Wait();
}
}
}
但它似乎以串行方式工作..我做错了什么?我试着做 while 循环异步。我想同时做这两件事。当列表中的数据可用时,然后绘制。
您可以使用 TransformManyBlock<string, int>
as the producer block, and an ActionBlock<int>
as the consumer block. The TransformManyBlock
would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>>
delegate, and passed an iterator method(下例中的 Produce
方法)逐个生成值:
Random random = new Random();
var producer = new TransformManyBlock<string, int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer, new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
不幸的是,生产者必须在产生每个值 (Thread.Sleep(1500);
) 之间的空闲期间阻塞工作线程,因为 TransformManyBlock
当前没有接受 [=19= 的构造函数].这可能会在 TPL Dataflow library. You could track this GitHub 问题的下一个版本中得到修复,以了解何时发布此功能。
备选解决方案: 除了显式链接生产者和消费者之外,您还可以让它们保持未链接状态,并手动将生产者产生的值发送给消费者。在这种情况下,两个块都是 ActionBlock
s:
Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer, consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
这种方法的主要困难是如何将生产者的完成传播给消费者,以便最终完成两个块。显然你不能使用 new DataflowLinkOptions { PropagateCompletion = true }
配置,因为块没有明确链接。您也不能 Complete
手动消费者,因为在这种情况下,它会停止过早地接受来自生产者的值。这个问题的解决方法就是上面例子中的PropagateCompletion
方法
我正在尝试理解 C# 中的任务,但仍然遇到一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我正在使用 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回。首先,我是连续做的。因此,读取 Bitmap
,从板上发送接收,然后绘图。但是读取位图并绘制它们需要花费太多时间。我想要一个保存视频帧的传输、接收 FIFO 缓冲区,以及一个不同的任务来完成发送接收每个帧的工作。所以我想并行进行。我想我应该创建 3 个任务:
tasks.Add(Task.Run(() => Video_load(video_path)));
tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
tasks.Add(Task.Run(() => VideoDisp_hw(32)));
我想运行“并行”。我应该使用什么类型的对象?并发队列?缓冲块?或者只是一个列表?
多谢指教!我想请教一件事。我正在尝试创建一个带有 2 个 TPL 块的简单控制台程序。 1 个块将是 Transform 块(获取消息,即“开始”)并将数据加载到 List,另一个块将是 ActionBlock(只是从列表中读取数据并打印它们)。这是下面的代码:
namespace TPL_Dataflow
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Random randn = new Random();
var loadData = new TransformBlock<string, List<int>>(async sample_string =>
{
List<int> input_data = new List<int>();
int cnt = 0;
if (sample_string == "start")
{
Console.WriteLine("Inside loadData");
while (cnt < 16)
{
input_data.Add(randn.Next(1, 255));
await Task.Delay(1500);
Console.WriteLine("Cnt");
cnt++;
}
}
else
{
Console.WriteLine("Not started yet");
}
return input_data;
});
var PrintData = new ActionBlock<List<int>>(async input_data =>
{
while(input_data.Count > 0)
{
Console.WriteLine("output Data = " + input_data.First());
await Task.Delay(1000);
input_data.RemoveAt(0);
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
loadData.LinkTo(PrintData, input_data => input_data.Count() >0 );
//loadData.LinkTo(PrintData, linkOptions);
loadData.SendAsync("start");
loadData.Complete();
PrintData.Completion.Wait();
}
}
}
但它似乎以串行方式工作..我做错了什么?我试着做 while 循环异步。我想同时做这两件事。当列表中的数据可用时,然后绘制。
您可以使用 TransformManyBlock<string, int>
as the producer block, and an ActionBlock<int>
as the consumer block. The TransformManyBlock
would be instantiated with the constructor that accepts a Func<string, IEnumerable<int>>
delegate, and passed an iterator method(下例中的 Produce
方法)逐个生成值:
Random random = new Random();
var producer = new TransformManyBlock<string, int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer, new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
不幸的是,生产者必须在产生每个值 (Thread.Sleep(1500);
) 之间的空闲期间阻塞工作线程,因为 TransformManyBlock
当前没有接受 [=19= 的构造函数].这可能会在 TPL Dataflow library. You could track this GitHub 问题的下一个版本中得到修复,以了解何时发布此功能。
备选解决方案: 除了显式链接生产者和消费者之外,您还可以让它们保持未链接状态,并手动将生产者产生的值发送给消费者。在这种情况下,两个块都是 ActionBlock
s:
Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1, 255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer, consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
这种方法的主要困难是如何将生产者的完成传播给消费者,以便最终完成两个块。显然你不能使用 new DataflowLinkOptions { PropagateCompletion = true }
配置,因为块没有明确链接。您也不能 Complete
手动消费者,因为在这种情况下,它会停止过早地接受来自生产者的值。这个问题的解决方法就是上面例子中的PropagateCompletion
方法