使用多个任务时避免重复数据

Avoiding duplicate data when using multiple tasks

我正在尝试 运行 我的代码中的生产者消费者模式,以使其更快。我的流程是,我想同时使用多个任务 运行ning,以获取数据,将其包装在自定义 class 产品中,然后将产品添加到队列中。然后消费者通过单个任务通过Entity Framework将其保存在DB中。我通过在数据库中插入大约 1000 个产品来测试我现在拥有的代码,然后 运行ning sql 查询以检查重复行。 SQL Query result

如图所示,大约有 30 种产品在数据库中出现了不止一次。

这是我的代码:

public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
    BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
    BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);

    var consumer = Task.Run(() =>
    {
        foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
        {
            InsertProductInDB(readyProduct, context);
        }
    });

    var producers = Enumerable.Range(0, 25)
        .Select(_ => Task.Run(() =>
        {
            foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
            {
                Product product = new Product();
                byte[] unconvertedByteArray;
                string xml;
                string url = @"https://data.Icecat.biz/export/freexml.int/en/";

                unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
                xml = Encoding.UTF8.GetString(unconvertedByteArray);
                XmlDocument xmlDoc = new XmlDocument();
                xmlDoc.LoadXml(xml);

                GetProductDetails(product, xmlDoc, index);

                XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
                product.FeaturesLink = GetProductFeatures(product, nodeList);

                nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
                product.Images = GetProductImages(nodeList);
                productsQueue.Add(product);
            }
        })).ToArray();

    Task.WaitAll(producers);

    productsQueue.CompleteAdding();

    consumer.Wait();
}

总而言之,我的问题是我该怎么做才能避免这种情况发生?

为避免重复尝试向产品 table 添加新字段并将其命名为 Code,其中包含产品 xml 的哈希值。 并确保为其添加独特的属性。 这样,由于唯一的 constraint/index.

,任何添加重复项的尝试都会失败
public static string Hash(string input)
{
    var hash = new SHA1Managed().ComputeHash(Encoding.UTF8.GetBytes(input));
    return string.Concat(hash.Select(b => b.ToString("x2")));
}

public static void GetAllProductsFromIndexes_AndPutInDB(List<IndexModel> indexes, ProductContext context)
{
    BlockingCollection<IndexModel> inputQueue = CreateInputQueue(indexes);
    BlockingCollection<Product> productsQueue = new BlockingCollection<Product>(500);

    var consumer = Task.Run(() =>
    {
        foreach (Product readyProduct in productsQueue.GetConsumingEnumerable())
        {
            InsertProductInDB(readyProduct, context);
        }
    });

    var producers = Enumerable.Range(0, 25)
        .Select(_ => Task.Run(() =>
        {
            foreach (IndexModel index in inputQueue.GetConsumingEnumerable())
            {
                Product product = new Product();
                byte[] unconvertedByteArray;
                string xml;
                string url = @"https://data.Icecat.biz/export/freexml.int/en/";

                unconvertedByteArray = DownloadIcecatFile(index.IndexNumber.ToString() + ".xml", url);
                xml = Encoding.UTF8.GetString(unconvertedByteArray);
                XmlDocument xmlDoc = new XmlDocument();
                xmlDoc.LoadXml(xml);

                GetProductDetails(product, xmlDoc, index);

                XmlNodeList nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductFeature"));
                product.FeaturesLink = GetProductFeatures(product, nodeList);

                nodeList = (xmlDoc.SelectNodes("ICECAT-interface/Product/ProductGallery/ProductPicture"));
                product.Images = GetProductImages(nodeList);
                product.Code= Hash(xml);
                productsQueue.Add(product);
            }
        })).ToArray();

    Task.WaitAll(producers);

    productsQueue.CompleteAdding();

    consumer.Wait();
}