使用多个任务时避免重复数据
Avoiding duplicate data when using multiple tasks
我正在尝试 运行 我的代码中的生产者消费者模式,以使其更快。我的流程是,我想同时使用多个任务 运行ning,以获取数据,将其包装在自定义 class 产品中,然后将产品添加到队列中。然后消费者通过单个任务通过Entity Framework将其保存在DB中。我通过在数据库中插入大约 1000 个产品来测试我现在拥有的代码,然后 运行ning sql 查询以检查重复行。
SQL Query result
如图所示,大约有 30 种产品在数据库中出现了不止一次。
这是我的代码:
public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);
var consumer = Task.Run(() =>
{
foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
{
InsertProductInDB(readyProduct, context);
}
});
var producers = Enumerable.Range(0, 25)
.Select(_ => Task.Run(() =>
{
foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
{
Product product = new Product();
byte[] unconvertedByteArray;
string xml;
string url = @"https://data.Icecat.biz/export/freexml.int/en/";
unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
xml = Encoding.UTF8.GetString(unconvertedByteArray);
XmlDocument xmlDoc = new XmlDocument();
xmlDoc.LoadXml(xml);
GetProductDetails(product, xmlDoc, index);
XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
product.FeaturesLink = GetProductFeatures(product, nodeList);
nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
product.Images = GetProductImages(nodeList);
productsQueue.Add(product);
}
})).ToArray();
Task.WaitAll(producers);
productsQueue.CompleteAdding();
consumer.Wait();
}
总而言之,我的问题是我该怎么做才能避免这种情况发生?
为避免重复尝试向产品 table 添加新字段并将其命名为 Code
,其中包含产品 xml 的哈希值。
并确保为其添加独特的属性。
这样,由于唯一的 constraint/index.
,任何添加重复项的尝试都会失败
public static string Hash(string input)
{
var hash = new SHA1Managed().ComputeHash(Encoding.UTF8.GetBytes(input));
return string.Concat(hash.Select(b => b.ToString("x2")));
}
public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);
var consumer = Task.Run(() =>
{
foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
{
InsertProductInDB(readyProduct, context);
}
});
var producers = Enumerable.Range(0, 25)
.Select(_ => Task.Run(() =>
{
foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
{
Product product = new Product();
byte[] unconvertedByteArray;
string xml;
string url = @"https://data.Icecat.biz/export/freexml.int/en/";
unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
xml = Encoding.UTF8.GetString(unconvertedByteArray);
XmlDocument xmlDoc = new XmlDocument();
xmlDoc.LoadXml(xml);
GetProductDetails(product, xmlDoc, index);
XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
product.FeaturesLink = GetProductFeatures(product, nodeList);
nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
product.Images = GetProductImages(nodeList);
product.Code= Hash(xml);
productsQueue.Add(product);
}
})).ToArray();
Task.WaitAll(producers);
productsQueue.CompleteAdding();
consumer.Wait();
}
我正在尝试 运行 我的代码中的生产者消费者模式,以使其更快。我的流程是,我想同时使用多个任务 运行ning,以获取数据,将其包装在自定义 class 产品中,然后将产品添加到队列中。然后消费者通过单个任务通过Entity Framework将其保存在DB中。我通过在数据库中插入大约 1000 个产品来测试我现在拥有的代码,然后 运行ning sql 查询以检查重复行。 SQL Query result
如图所示,大约有 30 种产品在数据库中出现了不止一次。
这是我的代码:
public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);
var consumer = Task.Run(() =>
{
foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
{
InsertProductInDB(readyProduct, context);
}
});
var producers = Enumerable.Range(0, 25)
.Select(_ => Task.Run(() =>
{
foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
{
Product product = new Product();
byte[] unconvertedByteArray;
string xml;
string url = @"https://data.Icecat.biz/export/freexml.int/en/";
unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
xml = Encoding.UTF8.GetString(unconvertedByteArray);
XmlDocument xmlDoc = new XmlDocument();
xmlDoc.LoadXml(xml);
GetProductDetails(product, xmlDoc, index);
XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
product.FeaturesLink = GetProductFeatures(product, nodeList);
nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
product.Images = GetProductImages(nodeList);
productsQueue.Add(product);
}
})).ToArray();
Task.WaitAll(producers);
productsQueue.CompleteAdding();
consumer.Wait();
}
总而言之,我的问题是我该怎么做才能避免这种情况发生?
为避免重复尝试向产品 table 添加新字段并将其命名为 Code
,其中包含产品 xml 的哈希值。
并确保为其添加独特的属性。
这样,由于唯一的 constraint/index.
public static string Hash(string input)
{
var hash = new SHA1Managed().ComputeHash(Encoding.UTF8.GetBytes(input));
return string.Concat(hash.Select(b => b.ToString("x2")));
}
public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);
var consumer = Task.Run(() =>
{
foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
{
InsertProductInDB(readyProduct, context);
}
});
var producers = Enumerable.Range(0, 25)
.Select(_ => Task.Run(() =>
{
foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
{
Product product = new Product();
byte[] unconvertedByteArray;
string xml;
string url = @"https://data.Icecat.biz/export/freexml.int/en/";
unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
xml = Encoding.UTF8.GetString(unconvertedByteArray);
XmlDocument xmlDoc = new XmlDocument();
xmlDoc.LoadXml(xml);
GetProductDetails(product, xmlDoc, index);
XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
product.FeaturesLink = GetProductFeatures(product, nodeList);
nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
product.Images = GetProductImages(nodeList);
product.Code= Hash(xml);
productsQueue.Add(product);
}
})).ToArray();
Task.WaitAll(producers);
productsQueue.CompleteAdding();
consumer.Wait();
}