不使用所有内核将文件导入 SQLite 数据库

Importing files into SQLite database not using all cores

上下文

我正在借助 EntityFrameworkCore 将 IMDB 数据库文件导入 SQLite 数据库。事实上,有两个文件,titles.basics 和 titles.akas(通过其电影 ID 链接到基础知识)。

起初,我有一个线程从基础知识中读取行并循环遍历 akas 直到它更改 ID。虽然,那里有一个问题,最重要的是,它太慢了。因此,我决定创建一个多线程代码来同时读取两个文件,并创建另一个将 akas 与适当的电影相结合的代码。

我目前正在导入,所以我仍然不知道我的问题是否已解决(可能是)。不过,对我来说还是太慢了。

问题

combining 部分仍然很慢,但更重要的是,我可以看到我的过程只使用了 CPU 的大约 12%,这仅对应于 1/总使用量的 8,我有 8 个物理内核。所以,看起来这个过程真的只使用了 1 个核心。

我在这里不提供任何代码,因为拥有最少的可测试代码并不意味着什么。不过,您可以在这里看到两个版本:

https://cints.net/public/Imdb-MultiThread.cs.txt

using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder + fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered++;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder + fileName;
            var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;


            var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
            var dbLock = new SemaphoreSlim(1);
            var akasLock = new SemaphoreSlim(1);
            var currentTitlesAkasLock = new SemaphoreSlim(1);
            var associateLock = new SemaphoreSlim(1);

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;

                var titles = new ConcurrentDictionary<string, MetaMovie>();
                var readTitles = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(titlesFile), (titleLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                        dbLock.Wait();
                        MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).Include(m => m.Titles).FirstOrDefault();
                        dbLock.Release();
                        if (metaMovie == null)
                        {
                            int totalMinutes = -1;
                            if (!int.TryParse(movieInfos[7], out totalMinutes))
                            {
                                totalMinutes = -1;
                            }
                            metaMovie = new MetaMovie
                            {
                                ExternalId = movieInfos[0],
                                MetaSource = nameof(Imdb),
                                MovieType = movieInfos[1],
                                Title = movieInfos[3],
                                TotalMinutes = totalMinutes,
                                Genres = movieInfos[8]
                            };
                            metaMovie.Titles = new List<MetaTitle>();
                            if (int.TryParse(movieInfos[5], out int startYear))
                            {
                                metaMovie.StartYear = new DateTime(startYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.StartYear = new DateTime(9999, 1, 1);
                            }
                            if (int.TryParse(movieInfos[6], out int endYear))
                            {
                                metaMovie.EndYear = new DateTime(endYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.EndYear = metaMovie.StartYear;
                            }
                        }

                        titles.TryAdd(metaMovie.ExternalId, metaMovie);
                    });
                });

                var akas = new Dictionary<string, List<MetaTitle>>();
                var currentTitlesAkas = new ConcurrentDictionary<string, int>();
                var readAkas = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(akasFile), (akaLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        currentTitlesAkasLock.Wait();
                        var titleInfos = akaLine.Split("\t", StringSplitOptions.None);
                        var externalId = titleInfos[0];
                        if (!currentTitlesAkas.ContainsKey(externalId))
                        {
                            currentTitlesAkas.TryAdd(externalId, 1);
                        }
                        else
                        {
                            currentTitlesAkas[externalId]++;
                        }
                        currentTitlesAkasLock.Release();

                        var metaTitle = new MetaTitle
                        {
                            MetaMovie = null,
                            Text = titleInfos[2],
                            Region = titleInfos[3],
                            Language = titleInfos[4]
                        };

                        akasLock.Wait();
                        List<MetaTitle> titleAkas;
                        if (!akas.ContainsKey(externalId))
                        {
                            titleAkas = new List<MetaTitle>();
                            akas.Add(externalId, titleAkas);
                        }
                        else
                        {
                            titleAkas = akas[externalId];
                        }
                        titleAkas.Add(metaTitle);
                        akasLock.Release();

                        currentTitlesAkasLock.Wait();
                        currentTitlesAkas[externalId]--;
                        currentTitlesAkasLock.Release();
                    });
                });

                var savingCounter = 0;
                var associate = Task.Factory.StartNew(() =>
                {
                    Parallel.For(1, Environment.ProcessorCount * 10, async (_) =>
                    {
                        var isAssociating = true;
                        do
                        {
                            var externalId = string.Empty;
                            var currentTitleAkaRemoved = false;
                            currentTitlesAkasLock.Wait();
                            foreach (var curExternalId in currentTitlesAkas.Keys.OrderBy(t => t))
                            {
                                if (currentTitlesAkas[curExternalId] == 0)
                                {
                                    externalId = curExternalId;
                                    break;
                                }
                            }
                            if (externalId != String.Empty)
                            {
                                currentTitleAkaRemoved = currentTitlesAkas.TryRemove(externalId, out int useless0); // Removing so other threads won't take it
                            }
                            isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                            currentTitlesAkasLock.Release();

                            if (String.IsNullOrEmpty(externalId) || !currentTitleAkaRemoved) continue;

                            if (titles.TryGetValue(externalId, out MetaMovie metaMovie))
                            {
                                akasLock.Wait();
                                var titleAkas = akas[externalId];
                                akas.Remove(externalId);
                                akasLock.Release();

                                var changedMovie = false;
                                var movieAkas = metaMovie.Titles.Select(t => t).ToList(); // Clone list
                                foreach (var metaTitle in titleAkas)
                                {
                                    var existingTitle = movieAkas.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                                    if (existingTitle == null)
                                    {
                                        changedMovie = true;
                                        metaMovie.Titles.Add(metaTitle);
                                    }
                                    else
                                    {
                                        movieAkas.Remove(existingTitle);
                                    }
                                }
                                foreach (var movieTitle in movieAkas)
                                {
                                    changedMovie = true;
                                    metaMovie.Titles.Remove(movieTitle);
                                }

                                dbLock.Wait();
                                if (metaMovie.Id == 0)
                                {
                                    db.Add(metaMovie);
                                }
                                else if (changedMovie)
                                {
                                    db.Update(metaMovie);
                                }
                                dbLock.Release();

                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryRemove(externalId, out int uselessOut); // Free memory
                                isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                                currentTitlesAkasLock.Release();

                                titles.TryRemove(externalId, out MetaMovie uselessOut2); // Free memory

                                associateLock.Wait();
                                savingCounter++;
                                var localSavingCounter = savingCounter;
                                associateLock.Release();

                                if (localSavingCounter != 0 && localSavingCounter % 1000 == 0)
                                {
                                    var ttt = currentTitlesAkas.Where(t => t.Value > 0);
                                    dbLock.Wait();
                                    await db.SaveChangesAsync();
                                    dbLock.Release();
                                    Console.WriteLine("Saved " + localSavingCounter);
                                }
                            }
                            else if (!readTitles.IsCompleted) // If reading titles is not ended, then maybe it was not read yet... otherwise, it doesn't exist
                            {
                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryAdd(externalId, 0); // Readd because still no movie associated
                                currentTitlesAkasLock.Release();
                            }
                        } while (isAssociating);
                    });
                });

                Task.WaitAll(readTitles, readAkas, associate);
                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

https://cints.net/public/Imdb-SingleThread.cs.txt

using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder + fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered++;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder + fileName;
            var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;

            var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
            var titlesLines = File.ReadLines(titlesFile);
            var akasLines = File.ReadLines(akasFile);


            var titlesIterator = titlesLines.GetEnumerator();
            titlesIterator.MoveNext(); // Skip columns headers

            var akasIterator = akasLines.GetEnumerator();
            akasIterator.MoveNext();
            akasIterator.MoveNext(); // Done twice to skip columns headers
            var currentAka = akasIterator.Current;
            var savingCounter = 0;

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;
                while (titlesIterator.MoveNext())
                {
                    var titleLine = titlesIterator.Current;
                    var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                    MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).FirstOrDefault();
                    var isNewMovie = false;
                    if (metaMovie == null)
                    {
                        int totalMinutes = -1;
                        if (!int.TryParse(movieInfos[7], out totalMinutes))
                        {
                            totalMinutes = -1;
                        }
                        isNewMovie = true;
                        metaMovie = new MetaMovie
                        {
                            ExternalId = movieInfos[0],
                            MetaSource = nameof(Imdb),
                            MovieType = movieInfos[1],
                            Title = movieInfos[3],
                            TotalMinutes = totalMinutes,
                            Genres = movieInfos[8]
                        };
                        metaMovie.Titles = new List<MetaTitle>();
                        if (int.TryParse(movieInfos[5], out int startYear))
                        {
                            metaMovie.StartYear = new DateTime(startYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.StartYear = new DateTime(9999, 1, 1);
                        }
                        if (int.TryParse(movieInfos[6], out int endYear))
                        {
                            metaMovie.EndYear = new DateTime(endYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.EndYear = metaMovie.StartYear;
                        }
                    }

                    var movieAkasIds = metaMovie.Titles.Select(t => t.Id).ToList();
                    var titleInfos = currentAka?.Split("\t", StringSplitOptions.None);
                    while (currentAka != null && int.Parse(titleInfos[0][2..]) <= int.Parse(metaMovie.ExternalId[2..]))
                    {
                        if (titleInfos[0] == metaMovie.ExternalId)
                        {
                            var metaTitle = new MetaTitle
                            {
                                MetaMovie = metaMovie,
                                Text = titleInfos[2],
                                Region = titleInfos[3],
                                Language = titleInfos[4]
                            };

                            var existingTitle = metaMovie.Titles.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                            if (existingTitle == null)
                            {
                                metaMovie.Titles.Add(metaTitle);
                            }
                            else
                            {
                                movieAkasIds.Remove(existingTitle.Id);
                            }
                        }
                        else
                        {
                            var a = 1;
                        }

                        akasIterator.MoveNext();
                        currentAka = akasIterator.Current;
                        titleInfos = currentAka.Split("\t", StringSplitOptions.None);
                    }

                    foreach(var movieTitleId in movieAkasIds)
                    {
                        metaMovie.Titles.Remove(metaMovie.Titles.Where(t => t.Id == movieTitleId).FirstOrDefault());
                    }

                    if (isNewMovie)
                    {
                        db.Add(metaMovie);
                    }
                    else
                    {
                        db.Update(metaMovie);
                    }

                    savingCounter++;
                    if (savingCounter % 10000 == 0)
                    {
                        await db.SaveChangesAsync();
                        Console.WriteLine("Saved " + savingCounter);
                    }
                }

                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

在多线程版本中,较慢的部分在方法 LoadMetaDataAsync 中,更准确地说在 var associate = Task.Factory.StartNew(() => 代码部分。

这正在开发和清理中,拆分将在我有适当的 result/speed 后完成。

案件结案。我回到了单线程版本,发现了我最初的问题(我的代码假设文件是​​有序的,它们是部分有序的)。

感谢所有参与的人。