TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

TPL Dataflow , finish a Block , re-create a BLock

我正在使用 TPL Dataflow 显示视频,同时首先通过 TCP 将数据传递到板。我正在使用 CancellationTokenSource 来取消 Block 活动。但问题是,当我重新运行“CreateVideoProcessingNetwork”函数时我没有反应。 .Post() 命令 returns 错误。我应该如何重新创建或“重新运行”TPL 数据流?这是代码:

private void TPL_Click(object sender, EventArgs e)
        {
            CreateVideoProcessingNetwork();
        }

        

        public async void CreateVideoProcessingNetwork()
        {
            string video_path = @"C:\.....\video_640x360_360p.mp4";

            
            _canceller = new CancellationTokenSource();
            /****************** METHOD 1 - with yield *************/

            /* Video Loading TPL Block */
            //var video_loader = new TransformManyBlock<string, Bitmap>(load_video, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });


            var send_recv_block = new TransformBlock<Bitmap, Bitmap>(async recv_bitmap =>
            {
                Console.WriteLine("Inside send_recv block");
                var mem_stream = new MemoryStream();
                recv_bitmap.Save(mem_stream, System.Drawing.Imaging.ImageFormat.Jpeg);
                var recv_image_array = mem_stream.ToArray();


                NetworkStream stream = client.GetStream();
                byte[] transmit_buffer = new byte[4];
                transmit_buffer[0] = (byte)(recv_image_array.Length & (0xFF));
                transmit_buffer[1] = (byte)((recv_image_array.Length >> 8) & (0xFF));
                transmit_buffer[2] = (byte)((recv_image_array.Length >> 16) & (0xFF));
                transmit_buffer[3] = (byte)((recv_image_array.Length >> 24) & (0xFF));
                // Sending first the 32bit length
                await stream.WriteAsync(transmit_buffer, 0, 4);
                // Sending data
                await stream.WriteAsync(recv_image_array, 0, recv_image_array.Length);
                // Receiving data
                var recv_buffer = await Receive(stream);

                Bitmap tx_image_array;
                using (var ms = new MemoryStream(recv_image_array))
                {
                    tx_image_array = new Bitmap(ms);
                }

                return tx_image_array;
                

            },
            new ExecutionDataflowBlockOptions
            {
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            });



            /****************** METHOD 2 - with send async  ***********/
            var video_loader = new ActionBlock<string>(async path =>
            {

 

                Console.WriteLine("video_loader");
                capture = new VideoCapture(path);
                Mat matrix = new Mat();
                capture.Read(matrix);
                var mem_stream = new MemoryStream();

                while (matrix.Rows != 0 && matrix.Width != 0)
                {

                    Console.WriteLine("Inside Loop");
                    capture.Read(matrix);
                    if (matrix.Rows == 0 && matrix.Width == 0) break;
                    Bitmap bitmap = new Bitmap(matrix.Width, matrix.Rows);
                    bitmap = matrix.ToBitmap();

                    await send_recv_block.SendAsync(bitmap);
                    await Task.Delay(20);
                    if (_canceller.Token.IsCancellationRequested) break;

                }

            }, new ExecutionDataflowBlockOptions 
            {
                //BoundedCapacity = 10 ,
                CancellationToken = cancellationSource.Token
            });



            /* Video Loading TPL Block */
            var display_video = new ActionBlock<Bitmap>(async received_image =>
            {
                Console.WriteLine("Inside Display Video");
                PicturePlot2.Image = received_image;
                await Task.Delay(25);
            },
            new ExecutionDataflowBlockOptions()
            {
                TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
                //BoundedCapacity = 10,
                CancellationToken = cancellationSource.Token
            });


            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };


            /****************** METHOD 2 - with send async *************/
            Console.WriteLine("to Link");
            var send_recv_disposable = send_recv_block.LinkTo(display_video, linkOptions);
            Console.WriteLine("Video path" + video_path);
            //var apotelesma_post = video_loader.Post(video_path);
            var apotelesma_post = await video_loader.SendAsync(video_path);
            Console.WriteLine("Apotelesma Post "+ apotelesma_post);
            video_loader.Complete();
            try
            {
                await display_video.Completion;
            }
            catch (TaskCanceledException ex)
            {
                Console.WriteLine(ex.CancellationToken.IsCancellationRequested);
                video_loader.Complete();
                send_recv_block.Complete();
                display_video.Complete();
                MessageBox.Show("Video Ended");
                
            }

            
        }
            
        private  void Stop_Reset_Click(object sender, EventArgs e)
        {
            cancellationSource.Cancel();
            _canceller.Cancel();



        }

提前致谢

您没有显示您声明变量 cancellationSource 的位置,该变量提供给 ExecutionDataFlowBlockOptions

当您向 ExecutionDataFlowBlockOptions 提供取消令牌时,您是在告诉块在令牌被取消时进入已完成状态,任务状态为已取消。文档告诉我们这是最终的:

Because the CancellationToken property permanently cancels dataflow block execution, the whole pipeline must be recreated after the user cancels the operation and then wants to add more work items to the pipeline.[1]

因为您的停止按钮将此标记设置为已取消,所以当您重新创建块时,它们将以已取消状态开始。

_canceller = new CancellationTokenSource();上面你需要添加cancellationSource = new CancellationTokenSource();.