C# Parallel.for 仍然没有使用 concurrentList 包装器正确索引
C# Parallel.for still does not index right with a concurrentList wrapper
我有这个问题,我似乎无法让它工作...
我的列表有一个包装器以使其线程安全。
但是,索引在我的 parallel.for 循环中仍然不起作用。
public ConcurrentList<ConcurrentList<string>> multipleData(ConcurrentList<string> Adresses, ConcurrentList<string> PostalCodes)
{
ConcurrentList<ConcurrentList< string>> data = new ConcurrentList<ConcurrentList<string>>();
ServicePointManager.DefaultConnectionLimit = 100;
Parallel.For(0, Adresses.Count, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
WebClient web = new WebClient();
string textFromUrl = web.DownloadString("https://website1.com" + Adresses[i] + " " + PostalCodes[i]);
ConcurrentList<string> dataElement = new ConcurrentList<string>();
if (textFromUrl.Split(':', ',')[1] == "1")
{
textFromUrl = web.DownloadString("https://website2.com" + textFromUrl.Split('"', '"')[11]);
textFromUrl = web.DownloadString("https://website3.com" + textFromUrl.Split('"', '"')[3]);
ConcurrentList<RootObject> obj = JsonConvert.DeserializeObject<ConcurrentList<RootObject>>(textFromUrl);
dataElement.Add(obj[0].hoofdadres.geocodeerServiceZoekString);
dataElement.Add(obj[0].gebruiksdoel);
dataElement.Add(obj[0].begindatum.ToString());
}
else
{
dataElement.Add("empty");
dataElement.Add("empty");
dataElement.Add("empty");
}
data.Add(dataElement);
});
return data;
}
我把返回的数据导出到一个excelsheet。问题是返回的数据混淆了,所以 iteration0 不在 data[0] 中,iteration10 不在 data[10] 中。
我的并发列表包装器:
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
我该如何解决这个问题?我想用这个数据导出到excel,但是现在所有的数据都混在一起了
谢谢。
不是关于 ConcurrentList 包装器问题的实际答案,但因为您只是从外部资源加载数据,所以不需要并行执行这些数据。
使用 async-await
将以高效的方式做到这一点 - 几乎与更具可读性和可维护性的代码同时进行。
创建用于为一对地址和邮政编码检索数据的异步方法
private static HttpClient client1;
private static HttpClient client2;
private static HttpClient client3;
// HttpClient should be initialized once for whole application life-cycle
// Execute code below somewhere in your initialization
client1 = new HttpClient();
client1.BaseAddress = new Uri("https://website1.com");
client2 = new HttpClient();
client2.BaseAddress = new Uri("https://website2.com");
client3 = new HttpClient();
client3.BaseAddress = new Uri("https://website3.com");
public async Task<IEnumerable<string>> GetDataAsync(string address, string postalCode)
{
var path = $"{address} {postalCode}"; // build proper path for request
var textFrom1 = await client1.GetAsync(path); // Base address already added;
if (textFrom1.Split(':', ',')[1] != "1")
{
return Enumerable.Repeat("empty", 3);
}
var textFrom2 = await client2.GetAsync(textFrom1.Split('"', '"')[11]);
var textFrom3 = await client3.GetAsync(textFrom2.Split('"', '"')[3]);
var allData = JsonConvert.DeserializeObject<List<RootObject>>(textFrom3);
var item = allData.First();
return new[]
{
item.hoofdadres.geocodeerServiceZoekString,
item.gebruiksdoel,
item.begindatum.ToString()
};
}
然后在 "main" 函数中将所有内容合并到一个集合中
public async Task<IEnumerable<IEnumerable<string>> GetAll(
IEnumerable<string> adresses,
IEnumerable<string> postalCodes)
{
// Start all tasks one by one without waiting for responses
var tasks = addresses.Zip(postalCodes, (addr, code) => GetData(addr, code));
await Task.WhenAll(tasks);
return tasks.Select(task => task.Result).ToList();
}
使用上述方法 requests/responses 将以与并行相同的速度处理,但无需担心并发性和结果同步,因为所有操作都将在一个线程上执行。
我有这个问题,我似乎无法让它工作...
我的列表有一个包装器以使其线程安全。 但是,索引在我的 parallel.for 循环中仍然不起作用。
public ConcurrentList<ConcurrentList<string>> multipleData(ConcurrentList<string> Adresses, ConcurrentList<string> PostalCodes)
{
ConcurrentList<ConcurrentList< string>> data = new ConcurrentList<ConcurrentList<string>>();
ServicePointManager.DefaultConnectionLimit = 100;
Parallel.For(0, Adresses.Count, new ParallelOptions { MaxDegreeOfParallelism = 100 }, i =>
{
WebClient web = new WebClient();
string textFromUrl = web.DownloadString("https://website1.com" + Adresses[i] + " " + PostalCodes[i]);
ConcurrentList<string> dataElement = new ConcurrentList<string>();
if (textFromUrl.Split(':', ',')[1] == "1")
{
textFromUrl = web.DownloadString("https://website2.com" + textFromUrl.Split('"', '"')[11]);
textFromUrl = web.DownloadString("https://website3.com" + textFromUrl.Split('"', '"')[3]);
ConcurrentList<RootObject> obj = JsonConvert.DeserializeObject<ConcurrentList<RootObject>>(textFromUrl);
dataElement.Add(obj[0].hoofdadres.geocodeerServiceZoekString);
dataElement.Add(obj[0].gebruiksdoel);
dataElement.Add(obj[0].begindatum.ToString());
}
else
{
dataElement.Add("empty");
dataElement.Add("empty");
dataElement.Add("empty");
}
data.Add(dataElement);
});
return data;
}
我把返回的数据导出到一个excelsheet。问题是返回的数据混淆了,所以 iteration0 不在 data[0] 中,iteration10 不在 data[10] 中。
我的并发列表包装器:
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
我该如何解决这个问题?我想用这个数据导出到excel,但是现在所有的数据都混在一起了
谢谢。
不是关于 ConcurrentList 包装器问题的实际答案,但因为您只是从外部资源加载数据,所以不需要并行执行这些数据。
使用 async-await
将以高效的方式做到这一点 - 几乎与更具可读性和可维护性的代码同时进行。
创建用于为一对地址和邮政编码检索数据的异步方法
private static HttpClient client1;
private static HttpClient client2;
private static HttpClient client3;
// HttpClient should be initialized once for whole application life-cycle
// Execute code below somewhere in your initialization
client1 = new HttpClient();
client1.BaseAddress = new Uri("https://website1.com");
client2 = new HttpClient();
client2.BaseAddress = new Uri("https://website2.com");
client3 = new HttpClient();
client3.BaseAddress = new Uri("https://website3.com");
public async Task<IEnumerable<string>> GetDataAsync(string address, string postalCode)
{
var path = $"{address} {postalCode}"; // build proper path for request
var textFrom1 = await client1.GetAsync(path); // Base address already added;
if (textFrom1.Split(':', ',')[1] != "1")
{
return Enumerable.Repeat("empty", 3);
}
var textFrom2 = await client2.GetAsync(textFrom1.Split('"', '"')[11]);
var textFrom3 = await client3.GetAsync(textFrom2.Split('"', '"')[3]);
var allData = JsonConvert.DeserializeObject<List<RootObject>>(textFrom3);
var item = allData.First();
return new[]
{
item.hoofdadres.geocodeerServiceZoekString,
item.gebruiksdoel,
item.begindatum.ToString()
};
}
然后在 "main" 函数中将所有内容合并到一个集合中
public async Task<IEnumerable<IEnumerable<string>> GetAll(
IEnumerable<string> adresses,
IEnumerable<string> postalCodes)
{
// Start all tasks one by one without waiting for responses
var tasks = addresses.Zip(postalCodes, (addr, code) => GetData(addr, code));
await Task.WhenAll(tasks);
return tasks.Select(task => task.Result).ToList();
}
使用上述方法 requests/responses 将以与并行相同的速度处理,但无需担心并发性和结果同步,因为所有操作都将在一个线程上执行。