不使用所有内核将文件导入 SQLite 数据库
Importing files into SQLite database not using all cores
上下文
我正在借助 EntityFrameworkCore 将 IMDB 数据库文件导入 SQLite 数据库。事实上,有两个文件,titles.basics 和 titles.akas(通过其电影 ID 链接到基础知识)。
起初,我有一个线程从基础知识中读取行并循环遍历 akas 直到它更改 ID。虽然,那里有一个问题,最重要的是,它太慢了。因此,我决定创建一个多线程代码来同时读取两个文件,并创建另一个将 akas 与适当的电影相结合的代码。
我目前正在导入,所以我仍然不知道我的问题是否已解决(可能是)。不过,对我来说还是太慢了。
问题
combining 部分仍然很慢,但更重要的是,我可以看到我的过程只使用了 CPU 的大约 12%,这仅对应于 1/总使用量的 8,我有 8 个物理内核。所以,看起来这个过程真的只使用了 1 个核心。
我在这里不提供任何代码,因为拥有最少的可测试代码并不意味着什么。不过,您可以在这里看到两个版本:
https://cints.net/public/Imdb-MultiThread.cs.txt
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
class Imdb : MediaSource
{
private const string TITLES_FILE = "title.basics.tsv.gz";
private const string AKAS_FILE = "title.akas.tsv.gz";
private readonly string temporaryFolder = @"c:\temp\";
private readonly string baseUrl = "https://datasets.imdbws.com/";
private readonly WebClient webClient = new();
MediaRecognizerContext db = new();
private IQueryable<MetaMovie> imdbMovies = null;
private async Task<bool> GatherFilesAsync()
{
var totalFilesGathered = 0;
var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
foreach(var fileToDownload in filesToDownload)
{
var compressedFile = temporaryFolder + fileToDownload;
if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
{
await GatherFileAsync(fileToDownload);
totalFilesGathered++;
}
}
return totalFilesGathered != 0;
}
private async Task GatherFileAsync(string fileName)
{
var compressedFile = temporaryFolder + fileName;
var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);
using Stream fd = File.Create(uncompressedFile);
using Stream fs = File.OpenRead(compressedFile);
using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
var buffer = new byte[1024];
int nRead;
while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fd.WriteAsync(buffer, 0, nRead);
}
}
private async Task LoadMetaDataAsync()
{
//return; //TODO: Remove this line
//TODO: Reactivate this line
//if (!await GatherFilesAsync()) return;
var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
var dbLock = new SemaphoreSlim(1);
var akasLock = new SemaphoreSlim(1);
var currentTitlesAkasLock = new SemaphoreSlim(1);
var associateLock = new SemaphoreSlim(1);
using (var db = new MediaRecognizerContext())
{
db.ChangeTracker.AutoDetectChangesEnabled = false;
var titles = new ConcurrentDictionary<string, MetaMovie>();
var readTitles = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(titlesFile), (titleLine, _, readingIndex) =>
{
if (readingIndex == 0) return; // Skipping columns titles line
var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
dbLock.Wait();
MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).Include(m => m.Titles).FirstOrDefault();
dbLock.Release();
if (metaMovie == null)
{
int totalMinutes = -1;
if (!int.TryParse(movieInfos[7], out totalMinutes))
{
totalMinutes = -1;
}
metaMovie = new MetaMovie
{
ExternalId = movieInfos[0],
MetaSource = nameof(Imdb),
MovieType = movieInfos[1],
Title = movieInfos[3],
TotalMinutes = totalMinutes,
Genres = movieInfos[8]
};
metaMovie.Titles = new List<MetaTitle>();
if (int.TryParse(movieInfos[5], out int startYear))
{
metaMovie.StartYear = new DateTime(startYear, 1, 1);
}
else
{
metaMovie.StartYear = new DateTime(9999, 1, 1);
}
if (int.TryParse(movieInfos[6], out int endYear))
{
metaMovie.EndYear = new DateTime(endYear, 1, 1);
}
else
{
metaMovie.EndYear = metaMovie.StartYear;
}
}
titles.TryAdd(metaMovie.ExternalId, metaMovie);
});
});
var akas = new Dictionary<string, List<MetaTitle>>();
var currentTitlesAkas = new ConcurrentDictionary<string, int>();
var readAkas = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(akasFile), (akaLine, _, readingIndex) =>
{
if (readingIndex == 0) return; // Skipping columns titles line
currentTitlesAkasLock.Wait();
var titleInfos = akaLine.Split("\t", StringSplitOptions.None);
var externalId = titleInfos[0];
if (!currentTitlesAkas.ContainsKey(externalId))
{
currentTitlesAkas.TryAdd(externalId, 1);
}
else
{
currentTitlesAkas[externalId]++;
}
currentTitlesAkasLock.Release();
var metaTitle = new MetaTitle
{
MetaMovie = null,
Text = titleInfos[2],
Region = titleInfos[3],
Language = titleInfos[4]
};
akasLock.Wait();
List<MetaTitle> titleAkas;
if (!akas.ContainsKey(externalId))
{
titleAkas = new List<MetaTitle>();
akas.Add(externalId, titleAkas);
}
else
{
titleAkas = akas[externalId];
}
titleAkas.Add(metaTitle);
akasLock.Release();
currentTitlesAkasLock.Wait();
currentTitlesAkas[externalId]--;
currentTitlesAkasLock.Release();
});
});
var savingCounter = 0;
var associate = Task.Factory.StartNew(() =>
{
Parallel.For(1, Environment.ProcessorCount * 10, async (_) =>
{
var isAssociating = true;
do
{
var externalId = string.Empty;
var currentTitleAkaRemoved = false;
currentTitlesAkasLock.Wait();
foreach (var curExternalId in currentTitlesAkas.Keys.OrderBy(t => t))
{
if (currentTitlesAkas[curExternalId] == 0)
{
externalId = curExternalId;
break;
}
}
if (externalId != String.Empty)
{
currentTitleAkaRemoved = currentTitlesAkas.TryRemove(externalId, out int useless0); // Removing so other threads won't take it
}
isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
currentTitlesAkasLock.Release();
if (String.IsNullOrEmpty(externalId) || !currentTitleAkaRemoved) continue;
if (titles.TryGetValue(externalId, out MetaMovie metaMovie))
{
akasLock.Wait();
var titleAkas = akas[externalId];
akas.Remove(externalId);
akasLock.Release();
var changedMovie = false;
var movieAkas = metaMovie.Titles.Select(t => t).ToList(); // Clone list
foreach (var metaTitle in titleAkas)
{
var existingTitle = movieAkas.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
if (existingTitle == null)
{
changedMovie = true;
metaMovie.Titles.Add(metaTitle);
}
else
{
movieAkas.Remove(existingTitle);
}
}
foreach (var movieTitle in movieAkas)
{
changedMovie = true;
metaMovie.Titles.Remove(movieTitle);
}
dbLock.Wait();
if (metaMovie.Id == 0)
{
db.Add(metaMovie);
}
else if (changedMovie)
{
db.Update(metaMovie);
}
dbLock.Release();
currentTitlesAkasLock.Wait();
currentTitlesAkas.TryRemove(externalId, out int uselessOut); // Free memory
isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
currentTitlesAkasLock.Release();
titles.TryRemove(externalId, out MetaMovie uselessOut2); // Free memory
associateLock.Wait();
savingCounter++;
var localSavingCounter = savingCounter;
associateLock.Release();
if (localSavingCounter != 0 && localSavingCounter % 1000 == 0)
{
var ttt = currentTitlesAkas.Where(t => t.Value > 0);
dbLock.Wait();
await db.SaveChangesAsync();
dbLock.Release();
Console.WriteLine("Saved " + localSavingCounter);
}
}
else if (!readTitles.IsCompleted) // If reading titles is not ended, then maybe it was not read yet... otherwise, it doesn't exist
{
currentTitlesAkasLock.Wait();
currentTitlesAkas.TryAdd(externalId, 0); // Readd because still no movie associated
currentTitlesAkasLock.Release();
}
} while (isAssociating);
});
});
Task.WaitAll(readTitles, readAkas, associate);
await db.SaveChangesAsync();
}
}
public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
{
await LoadMetaDataAsync();
var movie = await ExtractInfosAsync(directory);
if (movie == null) return null;
if (imdbMovies == null)
{
imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
}
return FindCorrespondances(imdbMovies, movie);
}
}
}
https://cints.net/public/Imdb-SingleThread.cs.txt
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
class Imdb : MediaSource
{
private const string TITLES_FILE = "title.basics.tsv.gz";
private const string AKAS_FILE = "title.akas.tsv.gz";
private readonly string temporaryFolder = @"c:\temp\";
private readonly string baseUrl = "https://datasets.imdbws.com/";
private readonly WebClient webClient = new();
MediaRecognizerContext db = new();
private IQueryable<MetaMovie> imdbMovies = null;
private async Task<bool> GatherFilesAsync()
{
var totalFilesGathered = 0;
var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
foreach(var fileToDownload in filesToDownload)
{
var compressedFile = temporaryFolder + fileToDownload;
if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
{
await GatherFileAsync(fileToDownload);
totalFilesGathered++;
}
}
return totalFilesGathered != 0;
}
private async Task GatherFileAsync(string fileName)
{
var compressedFile = temporaryFolder + fileName;
var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);
using Stream fd = File.Create(uncompressedFile);
using Stream fs = File.OpenRead(compressedFile);
using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
var buffer = new byte[1024];
int nRead;
while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fd.WriteAsync(buffer, 0, nRead);
}
}
private async Task LoadMetaDataAsync()
{
//return; //TODO: Remove this line
//TODO: Reactivate this line
//if (!await GatherFilesAsync()) return;
var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
var titlesLines = File.ReadLines(titlesFile);
var akasLines = File.ReadLines(akasFile);
var titlesIterator = titlesLines.GetEnumerator();
titlesIterator.MoveNext(); // Skip columns headers
var akasIterator = akasLines.GetEnumerator();
akasIterator.MoveNext();
akasIterator.MoveNext(); // Done twice to skip columns headers
var currentAka = akasIterator.Current;
var savingCounter = 0;
using (var db = new MediaRecognizerContext())
{
db.ChangeTracker.AutoDetectChangesEnabled = false;
while (titlesIterator.MoveNext())
{
var titleLine = titlesIterator.Current;
var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).FirstOrDefault();
var isNewMovie = false;
if (metaMovie == null)
{
int totalMinutes = -1;
if (!int.TryParse(movieInfos[7], out totalMinutes))
{
totalMinutes = -1;
}
isNewMovie = true;
metaMovie = new MetaMovie
{
ExternalId = movieInfos[0],
MetaSource = nameof(Imdb),
MovieType = movieInfos[1],
Title = movieInfos[3],
TotalMinutes = totalMinutes,
Genres = movieInfos[8]
};
metaMovie.Titles = new List<MetaTitle>();
if (int.TryParse(movieInfos[5], out int startYear))
{
metaMovie.StartYear = new DateTime(startYear, 1, 1);
}
else
{
metaMovie.StartYear = new DateTime(9999, 1, 1);
}
if (int.TryParse(movieInfos[6], out int endYear))
{
metaMovie.EndYear = new DateTime(endYear, 1, 1);
}
else
{
metaMovie.EndYear = metaMovie.StartYear;
}
}
var movieAkasIds = metaMovie.Titles.Select(t => t.Id).ToList();
var titleInfos = currentAka?.Split("\t", StringSplitOptions.None);
while (currentAka != null && int.Parse(titleInfos[0][2..]) <= int.Parse(metaMovie.ExternalId[2..]))
{
if (titleInfos[0] == metaMovie.ExternalId)
{
var metaTitle = new MetaTitle
{
MetaMovie = metaMovie,
Text = titleInfos[2],
Region = titleInfos[3],
Language = titleInfos[4]
};
var existingTitle = metaMovie.Titles.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
if (existingTitle == null)
{
metaMovie.Titles.Add(metaTitle);
}
else
{
movieAkasIds.Remove(existingTitle.Id);
}
}
else
{
var a = 1;
}
akasIterator.MoveNext();
currentAka = akasIterator.Current;
titleInfos = currentAka.Split("\t", StringSplitOptions.None);
}
foreach(var movieTitleId in movieAkasIds)
{
metaMovie.Titles.Remove(metaMovie.Titles.Where(t => t.Id == movieTitleId).FirstOrDefault());
}
if (isNewMovie)
{
db.Add(metaMovie);
}
else
{
db.Update(metaMovie);
}
savingCounter++;
if (savingCounter % 10000 == 0)
{
await db.SaveChangesAsync();
Console.WriteLine("Saved " + savingCounter);
}
}
await db.SaveChangesAsync();
}
}
public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
{
await LoadMetaDataAsync();
var movie = await ExtractInfosAsync(directory);
if (movie == null) return null;
if (imdbMovies == null)
{
imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
}
return FindCorrespondances(imdbMovies, movie);
}
}
}
在多线程版本中,较慢的部分在方法 LoadMetaDataAsync
中,更准确地说在 var associate = Task.Factory.StartNew(() =>
代码部分。
这正在开发和清理中,拆分将在我有适当的 result/speed 后完成。
案件结案。我回到了单线程版本,发现了我最初的问题(我的代码假设文件是有序的,它们是部分有序的)。
感谢所有参与的人。
上下文
我正在借助 EntityFrameworkCore 将 IMDB 数据库文件导入 SQLite 数据库。事实上,有两个文件,titles.basics 和 titles.akas(通过其电影 ID 链接到基础知识)。
起初,我有一个线程从基础知识中读取行并循环遍历 akas 直到它更改 ID。虽然,那里有一个问题,最重要的是,它太慢了。因此,我决定创建一个多线程代码来同时读取两个文件,并创建另一个将 akas 与适当的电影相结合的代码。
我目前正在导入,所以我仍然不知道我的问题是否已解决(可能是)。不过,对我来说还是太慢了。
问题
combining 部分仍然很慢,但更重要的是,我可以看到我的过程只使用了 CPU 的大约 12%,这仅对应于 1/总使用量的 8,我有 8 个物理内核。所以,看起来这个过程真的只使用了 1 个核心。
我在这里不提供任何代码,因为拥有最少的可测试代码并不意味着什么。不过,您可以在这里看到两个版本:
https://cints.net/public/Imdb-MultiThread.cs.txt
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
class Imdb : MediaSource
{
private const string TITLES_FILE = "title.basics.tsv.gz";
private const string AKAS_FILE = "title.akas.tsv.gz";
private readonly string temporaryFolder = @"c:\temp\";
private readonly string baseUrl = "https://datasets.imdbws.com/";
private readonly WebClient webClient = new();
MediaRecognizerContext db = new();
private IQueryable<MetaMovie> imdbMovies = null;
private async Task<bool> GatherFilesAsync()
{
var totalFilesGathered = 0;
var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
foreach(var fileToDownload in filesToDownload)
{
var compressedFile = temporaryFolder + fileToDownload;
if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
{
await GatherFileAsync(fileToDownload);
totalFilesGathered++;
}
}
return totalFilesGathered != 0;
}
private async Task GatherFileAsync(string fileName)
{
var compressedFile = temporaryFolder + fileName;
var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);
using Stream fd = File.Create(uncompressedFile);
using Stream fs = File.OpenRead(compressedFile);
using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
var buffer = new byte[1024];
int nRead;
while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fd.WriteAsync(buffer, 0, nRead);
}
}
private async Task LoadMetaDataAsync()
{
//return; //TODO: Remove this line
//TODO: Reactivate this line
//if (!await GatherFilesAsync()) return;
var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
var dbLock = new SemaphoreSlim(1);
var akasLock = new SemaphoreSlim(1);
var currentTitlesAkasLock = new SemaphoreSlim(1);
var associateLock = new SemaphoreSlim(1);
using (var db = new MediaRecognizerContext())
{
db.ChangeTracker.AutoDetectChangesEnabled = false;
var titles = new ConcurrentDictionary<string, MetaMovie>();
var readTitles = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(titlesFile), (titleLine, _, readingIndex) =>
{
if (readingIndex == 0) return; // Skipping columns titles line
var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
dbLock.Wait();
MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).Include(m => m.Titles).FirstOrDefault();
dbLock.Release();
if (metaMovie == null)
{
int totalMinutes = -1;
if (!int.TryParse(movieInfos[7], out totalMinutes))
{
totalMinutes = -1;
}
metaMovie = new MetaMovie
{
ExternalId = movieInfos[0],
MetaSource = nameof(Imdb),
MovieType = movieInfos[1],
Title = movieInfos[3],
TotalMinutes = totalMinutes,
Genres = movieInfos[8]
};
metaMovie.Titles = new List<MetaTitle>();
if (int.TryParse(movieInfos[5], out int startYear))
{
metaMovie.StartYear = new DateTime(startYear, 1, 1);
}
else
{
metaMovie.StartYear = new DateTime(9999, 1, 1);
}
if (int.TryParse(movieInfos[6], out int endYear))
{
metaMovie.EndYear = new DateTime(endYear, 1, 1);
}
else
{
metaMovie.EndYear = metaMovie.StartYear;
}
}
titles.TryAdd(metaMovie.ExternalId, metaMovie);
});
});
var akas = new Dictionary<string, List<MetaTitle>>();
var currentTitlesAkas = new ConcurrentDictionary<string, int>();
var readAkas = Task.Factory.StartNew(() =>
{
Parallel.ForEach(File.ReadLines(akasFile), (akaLine, _, readingIndex) =>
{
if (readingIndex == 0) return; // Skipping columns titles line
currentTitlesAkasLock.Wait();
var titleInfos = akaLine.Split("\t", StringSplitOptions.None);
var externalId = titleInfos[0];
if (!currentTitlesAkas.ContainsKey(externalId))
{
currentTitlesAkas.TryAdd(externalId, 1);
}
else
{
currentTitlesAkas[externalId]++;
}
currentTitlesAkasLock.Release();
var metaTitle = new MetaTitle
{
MetaMovie = null,
Text = titleInfos[2],
Region = titleInfos[3],
Language = titleInfos[4]
};
akasLock.Wait();
List<MetaTitle> titleAkas;
if (!akas.ContainsKey(externalId))
{
titleAkas = new List<MetaTitle>();
akas.Add(externalId, titleAkas);
}
else
{
titleAkas = akas[externalId];
}
titleAkas.Add(metaTitle);
akasLock.Release();
currentTitlesAkasLock.Wait();
currentTitlesAkas[externalId]--;
currentTitlesAkasLock.Release();
});
});
var savingCounter = 0;
var associate = Task.Factory.StartNew(() =>
{
Parallel.For(1, Environment.ProcessorCount * 10, async (_) =>
{
var isAssociating = true;
do
{
var externalId = string.Empty;
var currentTitleAkaRemoved = false;
currentTitlesAkasLock.Wait();
foreach (var curExternalId in currentTitlesAkas.Keys.OrderBy(t => t))
{
if (currentTitlesAkas[curExternalId] == 0)
{
externalId = curExternalId;
break;
}
}
if (externalId != String.Empty)
{
currentTitleAkaRemoved = currentTitlesAkas.TryRemove(externalId, out int useless0); // Removing so other threads won't take it
}
isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
currentTitlesAkasLock.Release();
if (String.IsNullOrEmpty(externalId) || !currentTitleAkaRemoved) continue;
if (titles.TryGetValue(externalId, out MetaMovie metaMovie))
{
akasLock.Wait();
var titleAkas = akas[externalId];
akas.Remove(externalId);
akasLock.Release();
var changedMovie = false;
var movieAkas = metaMovie.Titles.Select(t => t).ToList(); // Clone list
foreach (var metaTitle in titleAkas)
{
var existingTitle = movieAkas.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
if (existingTitle == null)
{
changedMovie = true;
metaMovie.Titles.Add(metaTitle);
}
else
{
movieAkas.Remove(existingTitle);
}
}
foreach (var movieTitle in movieAkas)
{
changedMovie = true;
metaMovie.Titles.Remove(movieTitle);
}
dbLock.Wait();
if (metaMovie.Id == 0)
{
db.Add(metaMovie);
}
else if (changedMovie)
{
db.Update(metaMovie);
}
dbLock.Release();
currentTitlesAkasLock.Wait();
currentTitlesAkas.TryRemove(externalId, out int uselessOut); // Free memory
isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
currentTitlesAkasLock.Release();
titles.TryRemove(externalId, out MetaMovie uselessOut2); // Free memory
associateLock.Wait();
savingCounter++;
var localSavingCounter = savingCounter;
associateLock.Release();
if (localSavingCounter != 0 && localSavingCounter % 1000 == 0)
{
var ttt = currentTitlesAkas.Where(t => t.Value > 0);
dbLock.Wait();
await db.SaveChangesAsync();
dbLock.Release();
Console.WriteLine("Saved " + localSavingCounter);
}
}
else if (!readTitles.IsCompleted) // If reading titles is not ended, then maybe it was not read yet... otherwise, it doesn't exist
{
currentTitlesAkasLock.Wait();
currentTitlesAkas.TryAdd(externalId, 0); // Readd because still no movie associated
currentTitlesAkasLock.Release();
}
} while (isAssociating);
});
});
Task.WaitAll(readTitles, readAkas, associate);
await db.SaveChangesAsync();
}
}
public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
{
await LoadMetaDataAsync();
var movie = await ExtractInfosAsync(directory);
if (movie == null) return null;
if (imdbMovies == null)
{
imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
}
return FindCorrespondances(imdbMovies, movie);
}
}
}
https://cints.net/public/Imdb-SingleThread.cs.txt
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
class Imdb : MediaSource
{
private const string TITLES_FILE = "title.basics.tsv.gz";
private const string AKAS_FILE = "title.akas.tsv.gz";
private readonly string temporaryFolder = @"c:\temp\";
private readonly string baseUrl = "https://datasets.imdbws.com/";
private readonly WebClient webClient = new();
MediaRecognizerContext db = new();
private IQueryable<MetaMovie> imdbMovies = null;
private async Task<bool> GatherFilesAsync()
{
var totalFilesGathered = 0;
var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
foreach(var fileToDownload in filesToDownload)
{
var compressedFile = temporaryFolder + fileToDownload;
if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
{
await GatherFileAsync(fileToDownload);
totalFilesGathered++;
}
}
return totalFilesGathered != 0;
}
private async Task GatherFileAsync(string fileName)
{
var compressedFile = temporaryFolder + fileName;
var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);
using Stream fd = File.Create(uncompressedFile);
using Stream fs = File.OpenRead(compressedFile);
using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
var buffer = new byte[1024];
int nRead;
while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fd.WriteAsync(buffer, 0, nRead);
}
}
private async Task LoadMetaDataAsync()
{
//return; //TODO: Remove this line
//TODO: Reactivate this line
//if (!await GatherFilesAsync()) return;
var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
var titlesLines = File.ReadLines(titlesFile);
var akasLines = File.ReadLines(akasFile);
var titlesIterator = titlesLines.GetEnumerator();
titlesIterator.MoveNext(); // Skip columns headers
var akasIterator = akasLines.GetEnumerator();
akasIterator.MoveNext();
akasIterator.MoveNext(); // Done twice to skip columns headers
var currentAka = akasIterator.Current;
var savingCounter = 0;
using (var db = new MediaRecognizerContext())
{
db.ChangeTracker.AutoDetectChangesEnabled = false;
while (titlesIterator.MoveNext())
{
var titleLine = titlesIterator.Current;
var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).FirstOrDefault();
var isNewMovie = false;
if (metaMovie == null)
{
int totalMinutes = -1;
if (!int.TryParse(movieInfos[7], out totalMinutes))
{
totalMinutes = -1;
}
isNewMovie = true;
metaMovie = new MetaMovie
{
ExternalId = movieInfos[0],
MetaSource = nameof(Imdb),
MovieType = movieInfos[1],
Title = movieInfos[3],
TotalMinutes = totalMinutes,
Genres = movieInfos[8]
};
metaMovie.Titles = new List<MetaTitle>();
if (int.TryParse(movieInfos[5], out int startYear))
{
metaMovie.StartYear = new DateTime(startYear, 1, 1);
}
else
{
metaMovie.StartYear = new DateTime(9999, 1, 1);
}
if (int.TryParse(movieInfos[6], out int endYear))
{
metaMovie.EndYear = new DateTime(endYear, 1, 1);
}
else
{
metaMovie.EndYear = metaMovie.StartYear;
}
}
var movieAkasIds = metaMovie.Titles.Select(t => t.Id).ToList();
var titleInfos = currentAka?.Split("\t", StringSplitOptions.None);
while (currentAka != null && int.Parse(titleInfos[0][2..]) <= int.Parse(metaMovie.ExternalId[2..]))
{
if (titleInfos[0] == metaMovie.ExternalId)
{
var metaTitle = new MetaTitle
{
MetaMovie = metaMovie,
Text = titleInfos[2],
Region = titleInfos[3],
Language = titleInfos[4]
};
var existingTitle = metaMovie.Titles.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
if (existingTitle == null)
{
metaMovie.Titles.Add(metaTitle);
}
else
{
movieAkasIds.Remove(existingTitle.Id);
}
}
else
{
var a = 1;
}
akasIterator.MoveNext();
currentAka = akasIterator.Current;
titleInfos = currentAka.Split("\t", StringSplitOptions.None);
}
foreach(var movieTitleId in movieAkasIds)
{
metaMovie.Titles.Remove(metaMovie.Titles.Where(t => t.Id == movieTitleId).FirstOrDefault());
}
if (isNewMovie)
{
db.Add(metaMovie);
}
else
{
db.Update(metaMovie);
}
savingCounter++;
if (savingCounter % 10000 == 0)
{
await db.SaveChangesAsync();
Console.WriteLine("Saved " + savingCounter);
}
}
await db.SaveChangesAsync();
}
}
public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
{
await LoadMetaDataAsync();
var movie = await ExtractInfosAsync(directory);
if (movie == null) return null;
if (imdbMovies == null)
{
imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
}
return FindCorrespondances(imdbMovies, movie);
}
}
}
在多线程版本中,较慢的部分在方法 LoadMetaDataAsync
中,更准确地说在 var associate = Task.Factory.StartNew(() =>
代码部分。
这正在开发和清理中,拆分将在我有适当的 result/speed 后完成。
案件结案。我回到了单线程版本,发现了我最初的问题(我的代码假设文件是有序的,它们是部分有序的)。
感谢所有参与的人。