IO 读写操作的 TPL 数据流实现中的内存问题

Memory issue in TPL Dataflow implementation of IO read write operation

我尝试使用文件IO操作来实现读写操作,并将这些操作封装到TransformBlock中,从而使这些操作线程安全,而不是使用锁定机制。

但问题是,当我尝试并行写入 5 个文件时,出现内存异常,并且在使用此实现时它阻塞了 UI 线程。实现在 Windows Phone 项目中完成。请指出此实现中的错误。

文件IO操作

public static readonly IsolatedStorageFile _isolatedStore = IsolatedStorageFile.GetUserStoreForApplication();
public static readonly FileIO _file = new FileIO();
public static readonly ConcurrentExclusiveSchedulerPair taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
public static readonly ExecutionDataflowBlockOptions exclusiveExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ExclusiveScheduler,
    BoundedCapacity = 1
};

public static readonly ExecutionDataflowBlockOptions concurrentExecutionDataFlow 
    = new ExecutionDataflowBlockOptions
{
    TaskScheduler = taskSchedulerPair.ConcurrentScheduler,
    BoundedCapacity = 1
};

public static async Task<T> LoadAsync<T>(string fileName)
{
    T result = default(T);

    var transBlock = new TransformBlock<string, T>
       (async fName =>
       {
           return await LoadData<T>(fName);
       }, concurrentExecutionDataFlow);

    transBlock.Post(fileName);

    result = await transBlock.ReceiveAsync();

    return result;
}

public static async Task SaveAsync<T>(T obj, string fileName)
{
    var transBlock = new TransformBlock<Tuple<T, string>, Task>
       (async tupleData =>
       {
          await SaveData(tupleData.Item1, tupleData.Item2);
       }, exclusiveExecutionDataFlow);

    transBlock.Post(new Tuple<T, string>(obj, fileName));

    await transBlock.ReceiveAsync();
}

MainPage.xaml.cs 用法

private static string data = "vjdsskjfhkjsdhvnvndjfhjvkhdfjkgd"
private static string fileName = string.Empty;
private List<string> DataLstSample = new List<string>();
private ObservableCollection<string> TestResults = new ObservableCollection<string>();
private static string data1 = "hjhkjhkhkjhjkhkhkjhkjhkhjkhjkh";
List<Task> allTsk = new List<Task>();
private Random rand = new Random();
private string  fileNameRand
{
    get
    {
        return rand.Next(100).ToString();
    }
}

public MainPage()
{
    InitializeComponent();

    for (int i = 0; i < 5; i ++)
    {
        DataLstSample.Add((i % 2) == 0 ? data : data1);
    }

}

private void Button_Click(object sender, RoutedEventArgs e)
{
    AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual();
}

public async void AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()
{
    TstRst.Text = "InProgress..";
    allTsk.Clear();

    foreach(var data in DataLstSample)
    {
        var fName = fileNameRand;

        var t = Task.Run(async () =>
        {
            await AppIsolatedStore.SaveAsync<string>(data, fName);
        });

        TestResults.Add(string.Format("Writing file name: {0}, data: {1}", fName, data));
        allTsk.Add(t);
    }

    await Task.WhenAll(allTsk);

    TstRst.Text = "Completed..";
}

异步保存和加载数据

        /// <summary>
        /// Load object from file
        /// </summary>
        private static async Task<T> LoadData<T>(string fileName)
        {

            T result = default(T);

            try
            {
                if (!string.IsNullOrWhiteSpace(fileName))
                {
                    using (var file = new IsolatedStorageFileStream(fileName, FileMode.OpenOrCreate, _isolatedStore))
                    {
                        var data = await _file.ReadTextAsync(file);

                        if (!string.IsNullOrWhiteSpace(data))
                        {
                            result = JsonConvert.DeserializeObject<T>(data);
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: LoadAsync : An error occured while loading data : {0}", ex.Message);
            }
            finally
            {

            }

            return result;
        }


        /// <summary>
        /// Save object from file
        /// </summary>
        private static async Task SaveData<T>(T obj, string fileName)
        {
            try
            {
                if (obj != null && !string.IsNullOrWhiteSpace(fileName))
                {
                    //Serialize object with JSON or XML serializer
                    string storageString = JsonConvert.SerializeObject(obj);

                    if (!string.IsNullOrWhiteSpace(storageString))
                    {
                        //Write content to file
                        await _file.WriteTextAsync(new IsolatedStorageFileStream(fileName, FileMode.Create, _isolatedStore), storageString);
                    }
                }
            }
            catch (Exception ex)
            {
                //todo: log the megatron exception in a file
                Debug.WriteLine("AppIsolatedStore: SaveAsync : An error occured while saving the data : {0}", ex.Message);
            }
            finally
            {
            }
        }

编辑:

内存异常的原因之一是我取的数据串太大了。字符串是 link: http://1drv.ms/1QWSAsc

但第二个问题是,如果我也添加小数据,那么它会阻塞 UI 线程。代码是否在 UI 上执行任何任务?

你有没有试过 BoundedCapacity parameter on the ExecutionDataflowBlockOptions ? The Introduction to TPL 提到块容量:

[...] bounding is useful in a dataflow network to avoid unbounded memory growth. This can be very important for reliability reasons if there’s a possibility that producers could end up generating data much faster than the consumers could process it...

我建议尝试使用此选项来限制已处理项目的排队,看看它是否有助于解决内存问题

不,您使用并发对,它使用默认线程池来处理任务,并且您使用 Run 方法实例化任务,所以问题不在这里。但是你这里的代码有两个主要威胁:

var transBlock = new TransformBlock<string, T>
   (async fName =>
   {
       // process file here
   }, concurrentExecutionDataFlow);

您真的不应该每次都创建 transBlockTPL Dataflow 的主要思想是您创建一次块并在之后使用它们。因此,您应该重构您的应用程序以减少实例化的块数,否则情况并非如此 TPL Dataflow 应该使用。

您代码中的另一个威胁是您显式阻塞了线程!

// Right here
await Task.WhenAll(allTsk);
TstRst.Text = "Completed..";

从同步事件处理程序的 async void 方法调用任务的 await 会阻塞线程,默认情况下 it captures the synchronization context. First of all, async void should be avoided. Second, if you are async, you should be async all the way, so event handler should be async too. Third, you may use a continuation for your task for update your UI or use current synchronization context

所以,您的代码应该是这样的:

// store the sync context in the field of your form
SynchronizationContext syncContext = SynchronizationContext.Current;

// avoid the async void :)
public async Task AppIsolatedStore_TestInMultiThread_LstResultShouldBeEqual()

// make event handler async - this is the only exception for the async void use rule from above
private async void Button_Click(object sender, RoutedEventArgs e)

// asynchronically wait the result without capturing the context
await Task.WhenAll(allTsk).ContinueWith(
  t => {
    // you can move out this logic to main method
    syncContext.Post(new SendOrPostCallback(o =>
        {
            TstRst.Text = "Completed..";
        }));
  }
);