响应式扩展:使用 Rx 创建一个处理文件的管道

Reactive Extensions: Creating a pipeline with Rx that works with files

我有一个包含三个步骤的流程管道:

  1. video to images: 我有一个视频被转换成静止图像 (帧)
  2. 帧压缩文件:处理完视频中的所有帧后,我应该用它们创建一个 Zip 文件。
  3. zip 文件 => 上传到 FTP

它涉及两个一次性项目:视频捕获和 zip 文件。

我如何使用 Rx 处理它?我不知道如何开始。

您是否必须传递每个步骤的原始对象。 视频捕获或 zip 文件是 "disposed" 应该没问题,因为我确信该操作会有一些副作用(MemoryStream,将文件写入磁盘等)。 您可以将指向每个操作结果的指针(Uri 的?)传递到管道的下一部分吗?

void Main()
{
    var video = new Uri("https://www.youtube.com/watch?v=Tp5mRlHwZ7M");
    var query = from frames in TranscodeVideoToImages(video)
        from zipFile in ZipFiles(frames)
        from uploadLocation in UploadFile(zipFile)
        select uploadLocation;
    query.Subscribe(...)
}
private IObservable<Uri[]> TranscodeVideoToImages(Uri imageSource)
{
    //Do some long running (async) work here.
    // Save work to disk
    // Return location of saved work
}
private IObservable<Uri> ZipFiles(Uri[] files)
{
    //Run the zip process on all of the files
    //  Return the location of the zip file
}
private IObservable<Uri> UploadFile(Uri source)
{
    //Upload the File. 
    //Probably as simple as as task based operation with .ToObservable()
}