如何在 RxUI 中正确实现后端 ReactiveList 以避免线程关联问题?

How to properly implement backend ReactiveList in RxUI to avoid thread affinity issue?

我是 Rx.Net 和 RxUI 的新手。在学习这两个库的过程中,我尝试构建一个从网站提取图像的演示应用程序。我使用 WPF 结合 Rx.Net 和 RxUI 来构建视图和视图模型,并使用 HtmlAgilityPack 来处理 html 文档。我的代码如下所示

视图模型:

public class MainViewModel : ReactiveObject
{
    public MainViewModel()
    {
        var canSearch =
            this.WhenAnyValue(x => x.TargetUrl, targetWebSite => !string.IsNullOrEmpty(targetWebSite));
        _searchCommand = ReactiveCommand.CreateFromTask(GetHtmlDocument, canSearch);
        _imageSequence = _searchCommand
            .SelectMany(ImageExtractService.ExtractAllImageAddress).Distinct().Publish().RefCount();
        _imageSequence.Subscribe(
            url => ImageList.Add(new ScrappedWebImageViewModel { ImageUrl = url }),
            ex => this.Log().Error(ex)); //Causing problem, need better solution
    }

    private readonly IObservable<string> _imageSequence;

    private IHtmlDownloadService _htmlDownloadService;

    private IHtmlDownloadService HtmlDownloadService =>
        _htmlDownloadService ?? (_htmlDownloadService = Locator.Current.GetService<IHtmlDownloadService>());

    private IImageExtractService _imageExtractService;

    private IImageExtractService ImageExtractService =>
        _imageExtractService ?? (_imageExtractService = Locator.Current.GetService<IImageExtractService>());

    public ReactiveList<ScrappedWebImageViewModel> ImageList =
        new ReactiveList<ScrappedWebImageViewModel>();

    private readonly ReactiveCommand<Unit, HtmlDocument> _searchCommand;

    public ICommand SearchCommand => _searchCommand;

    private async Task<HtmlDocument> GetHtmlDocument()
    {
        return await HtmlDownloadService.GetHtmlDocument(TargetUrl);
    }
}

查看:

public partial class MainWindow : IViewFor<MainViewModel>
{
    public MainWindow()
    {
        InitializeComponent();

        ViewModel = new MainViewModel();
        this.WhenActivated(d => 
        {
            d(this.Bind(ViewModel, x => x.Status, x => x.TblStatus.Text));
            d(this.Bind(ViewModel, x => x.Progress, x => x.TblProgress.Text));
            d(this.Bind(ViewModel, x => x.TargetUrl, x => x.TbxTargetWebSite.Text));
            d(this.OneWayBind(ViewModel, x => x.ImageList, x => x.LbxImageList.ItemsSource));
            d(this.BindCommand(ViewModel, x => x.SearchCommand, x => x.BtnBeginSearch));
        });
    }

    public static readonly DependencyProperty ViewModelProperty =
        DependencyProperty.Register("ViewModel", typeof(MainViewModel), typeof(MainWindow));

    public MainViewModel ViewModel
    {
        get => (MainViewModel) GetValue(ViewModelProperty);
        set => SetValue(ViewModelProperty, value);
    }

    object IViewFor.ViewModel
    {
        get => ViewModel;
        set => ViewModel = (MainViewModel)value;
    }
}

HtmlDownloadService:

internal class HtmlDownloadService : IHtmlDownloadService
{
    private readonly HtmlWeb _webClient = new HtmlWeb();

    public async Task<HtmlDocument> GetHtmlDocument(string url)
    {
        return await Task.Run(() => _webClient.Load(url));
    }
}

ImageExtractService:

internal class ImageExtractService : IImageExtractService
{
    public IEnumerable<string> ExtractAllImageAddress(HtmlDocument doc)
    {
        const string mstring = @".+\.(jpg|png|ico|jpeg|bmp|tif)$";
        var hrefList = doc.DocumentNode.SelectNodes(@".//*[@href]");
        var srcList = doc.DocumentNode.SelectNodes(@".//*[@src]");
        if (hrefList != null)
        {
            foreach (var href in hrefList)
            {
                var attr = href.Attributes["href"];
                if (Regex.IsMatch(attr.Value, mstring))
                {
                    yield return attr.Value;
                }
            }
        }

        if (srcList == null) yield break;
        foreach (var src in srcList)
        {
            var attr = src.Attributes["src"];
            if (Regex.IsMatch(attr.Value, mstring))
            {
                yield return attr.Value;
            }
        }
    }
}

问题是,命令执行后,应用程序将立即停止。此时主线程是运行 in

System.Reactive.dll!System.Reactive.Concurrency.AsyncLock.Wait

但是没有抛出异常,如果允许继续,应用程序将退出。我尝试 quote/unquote 几行,看起来这只是 'thread affinity' 问题的另一个实例。但我不知道如何解决这个问题。我的问题简而言之是:

  1. 如何以最合适的方式更新虚拟机?
  2. 如何捕获关闭应用程序的异常?

更新:

我尝试了一些没有可观察对象的其他方法

public MainViewModel()
{
    var canSearch =
        this.WhenAnyValue(x => x.TargetUrl, targetWebSite => !string.IsNullOrEmpty(targetWebSite));
    SearchCommand = ReactiveCommand.CreateFromTask(SearchImageAsync, canSearch, ThreadPoolScheduler.Instance);
}

private async Task SearchImageAsync()
{
    var doc = await HtmlDownloadService.GetHtmlDocument(TargetUrl);
    var imgs = ImageExtractService.ExtractAllImageAddress(doc);
    foreach (var url in imgs)
    {
        ImageList.Add(new ScrappedWebImageViewModel {ImageUrl = url});
    }
}

但是还是解决不了。我使用的是 Rx.Net 和 RxUI 的最新 unstable(alpha/preview) 版本,几乎没有示例代码可供我入门。所以如果有人能提供一些,那将是一个巨大的帮助,谢谢!

改变

_imageSequence.Subscribe(

_imageSequence.ObserveOn(RxApp.MainThreadScheduler).Subscribe(