用于从站点抓取数据的多线程 c# 控制台应用程序
Multithreaded c# console app to scrape data from sites
我编写了一个应用程序,可以遍历我们自己的属性并抓取数据。为确保我不会 运行 通过相同的 URL,我使用 MySQL 数据库来存储 URL,并在处理后对其进行标记。所有这些都是在一个线程中完成的,如果我只有几千个条目也没关系。但是我有几十万个条目需要解析,所以我需要对代码进行更改(我通常是多线程方面的新手)。我找到了一个示例并试图复制样式但似乎不起作用。有人知道以下代码有什么问题吗?
编辑:抱歉并不是要让人们猜测问题,而是我愚蠢地包含了例外。这是例外
"System.InValidCastException: 'Specified cast is not valid.'"
当我启动该过程时,它从数据库中收集 URLs,然后从不访问 DoWork 方法
//这将从数据库中获取条目
List<Mappings> items = bot.GetUrlsToProcess(100);
if (items != null)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Worker.Done = new Worker.DoneDelegate(WorkerDone);
foreach (var item in items)
{
urls.Add(item.Url);
WaitingTasks.Enqueue(new Task(id => new Worker().DoWork((int)id, item.Url, token), item.Url, token));
}
LaunchTasks();
}
static async void LaunchTasks()
{
// keep checking until we're done
while ((WaitingTasks.Count > 0) || (RunningTasks.Count > 0))
{
// launch tasks when there's room
while ((WaitingTasks.Count > 0) && (RunningTasks.Count < MaxRunningTasks))
{
Task task = WaitingTasks.Dequeue();
lock (RunningTasks) RunningTasks.Add((int)task.AsyncState, task);
task.Start();
}
UpdateConsole();
await Task.Delay(300); // wait before checking again
}
UpdateConsole(); // all done
}
static void UpdateConsole()
{
Console.Write(string.Format("\rwaiting: {0,3:##0} running: {1,3:##0} ", WaitingTasks.Count, RunningTasks.Count));
}
static void WorkerDone(int id)
{
lock (RunningTasks) RunningTasks.Remove(id);
}
public class Worker
{
public delegate void DoneDelegate(int taskId);
public static DoneDelegate Done { private get; set; }
public async void DoWork(object id, string url, CancellationToken token)
{
if (token.IsCancellationRequested) return;
Content obj;
try
{
int tries = 0;
bool IsUrlProcessed = true;
DateTime dtStart = DateTime.Now;
string articleDate = string.Empty;
try
{
ScrapeWeb bot = new ScrapeWeb();
SearchApi searchApi = new SearchApi();
SearchHits searchHits = searchApi.Url(url, 5, 0);
if (searchHits.Hits.Count() == 0)
{
obj = await bot.ReturnArticleObject(url);
if (obj.Code != HttpStatusCode.OK)
{
Console.WriteLine(string.Format("\r Status is {0}", obj.Code));
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.HttpCode = obj.Code;
}
else
{
string title = obj.Title;
string content = obj.Contents;
string description = obj.Description;
Articles article = new Articles();
article.Site = url.GetSite();
article.Content = content;
article.Title = title;
article.Url = url.ToLower();
article.Description = description;
string strThumbNail = HtmlHelper.GetImageUrl(url, obj.RawResponse);
article.Author = HtmlHelper.GetAuthor(url, obj.RawResponse);
if (!string.IsNullOrEmpty(strThumbNail))
{
//This condition needs to be added to remove ?n=<number> from EP thumbnails
if (strThumbNail.Contains("?"))
{
article.ImageUrl = strThumbNail.Substring(0, strThumbNail.IndexOf("?")).Replace("http:", "https:");
}
else
article.ImageUrl = strThumbNail.Replace("http:", "https:");
}
else
{
article.ImageUrl = string.IsNullOrEmpty(strThumbNail) ? article.Url.GetDefaultImageUrls() : strThumbNail.Replace("http:", "https:");
}
articleDate = HtmlHelper.GetPublishDate(url, obj.RawResponse);
if (string.IsNullOrEmpty(articleDate))
article.Pubdate = DateTime.Now;
else
article.Pubdate = DateTime.Parse(articleDate);
var client = new Index(searchApi);
var result = client.Upsert(article);
itemfound.HttpCode = obj.Code;
if (result)
{
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate);
UpdateItem(itemfound);
}
else
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
UpdateItem(itemfound, tries, IsUrlProcessed);
}
}
}
else
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = true;
itemfound.HttpCode = HttpStatusCode.OK;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
}
}
catch (Exception e)
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
}
finally
{
DateTime dtEnd = DateTime.Now;
Console.WriteLine(string.Format("\r Total time taken to process items is {0}", (dtEnd - dtStart).TotalSeconds));
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
Done((int)id);
}
}
所有这些代码都基于 Best multi-thread approach for multiple web requests 这个 link。谁能告诉我如何获得这种方法 运行ning?
我认为问题在于您创建任务的方式:
new Task(id => new Worker().DoWork((int)id, item.Url, token), item.Url, token)
此 Task
构造函数重载预期 Action<object>
委托。这意味着 id
将被键入为 object
并且您需要先将其转换回有用的内容。
Parameters
action
- Type:
System.Action<Object>
- The delegate that represents the code to execute in the task.
state
- Type:
System.Object
- An object representing data to be used by the action.
cancellationToken
- Type:
System.Threading.CancellationToken
-The CancellationToken
that that the new task will observe.
您决定通过调用 (int)id
将其转换为 int
,但您将 item.Url
作为对象本身传递。我不能 100% 告诉你 Url
的类型是什么,但我不希望 Url
命名的 属性 是 int
.[=29= 类型]
根据@MarcinJuraszek 所说的,我只是回到我的代码并添加了一个 int,因为我找不到另一种方法来解决它。这是我所做的更改
int i=0
foreach (var item in items)
{
urls.Add(item.Url);
WaitingTasks.Enqueue(new Task(id => new Worker().DoWork((string)id, item.Url, token), item.Url, token));
i++;
}
我编写了一个应用程序,可以遍历我们自己的属性并抓取数据。为确保我不会 运行 通过相同的 URL,我使用 MySQL 数据库来存储 URL,并在处理后对其进行标记。所有这些都是在一个线程中完成的,如果我只有几千个条目也没关系。但是我有几十万个条目需要解析,所以我需要对代码进行更改(我通常是多线程方面的新手)。我找到了一个示例并试图复制样式但似乎不起作用。有人知道以下代码有什么问题吗?
编辑:抱歉并不是要让人们猜测问题,而是我愚蠢地包含了例外。这是例外 "System.InValidCastException: 'Specified cast is not valid.'" 当我启动该过程时,它从数据库中收集 URLs,然后从不访问 DoWork 方法
//这将从数据库中获取条目
List<Mappings> items = bot.GetUrlsToProcess(100);
if (items != null)
{
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
Worker.Done = new Worker.DoneDelegate(WorkerDone);
foreach (var item in items)
{
urls.Add(item.Url);
WaitingTasks.Enqueue(new Task(id => new Worker().DoWork((int)id, item.Url, token), item.Url, token));
}
LaunchTasks();
}
static async void LaunchTasks()
{
// keep checking until we're done
while ((WaitingTasks.Count > 0) || (RunningTasks.Count > 0))
{
// launch tasks when there's room
while ((WaitingTasks.Count > 0) && (RunningTasks.Count < MaxRunningTasks))
{
Task task = WaitingTasks.Dequeue();
lock (RunningTasks) RunningTasks.Add((int)task.AsyncState, task);
task.Start();
}
UpdateConsole();
await Task.Delay(300); // wait before checking again
}
UpdateConsole(); // all done
}
static void UpdateConsole()
{
Console.Write(string.Format("\rwaiting: {0,3:##0} running: {1,3:##0} ", WaitingTasks.Count, RunningTasks.Count));
}
static void WorkerDone(int id)
{
lock (RunningTasks) RunningTasks.Remove(id);
}
public class Worker
{
public delegate void DoneDelegate(int taskId);
public static DoneDelegate Done { private get; set; }
public async void DoWork(object id, string url, CancellationToken token)
{
if (token.IsCancellationRequested) return;
Content obj;
try
{
int tries = 0;
bool IsUrlProcessed = true;
DateTime dtStart = DateTime.Now;
string articleDate = string.Empty;
try
{
ScrapeWeb bot = new ScrapeWeb();
SearchApi searchApi = new SearchApi();
SearchHits searchHits = searchApi.Url(url, 5, 0);
if (searchHits.Hits.Count() == 0)
{
obj = await bot.ReturnArticleObject(url);
if (obj.Code != HttpStatusCode.OK)
{
Console.WriteLine(string.Format("\r Status is {0}", obj.Code));
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.HttpCode = obj.Code;
}
else
{
string title = obj.Title;
string content = obj.Contents;
string description = obj.Description;
Articles article = new Articles();
article.Site = url.GetSite();
article.Content = content;
article.Title = title;
article.Url = url.ToLower();
article.Description = description;
string strThumbNail = HtmlHelper.GetImageUrl(url, obj.RawResponse);
article.Author = HtmlHelper.GetAuthor(url, obj.RawResponse);
if (!string.IsNullOrEmpty(strThumbNail))
{
//This condition needs to be added to remove ?n=<number> from EP thumbnails
if (strThumbNail.Contains("?"))
{
article.ImageUrl = strThumbNail.Substring(0, strThumbNail.IndexOf("?")).Replace("http:", "https:");
}
else
article.ImageUrl = strThumbNail.Replace("http:", "https:");
}
else
{
article.ImageUrl = string.IsNullOrEmpty(strThumbNail) ? article.Url.GetDefaultImageUrls() : strThumbNail.Replace("http:", "https:");
}
articleDate = HtmlHelper.GetPublishDate(url, obj.RawResponse);
if (string.IsNullOrEmpty(articleDate))
article.Pubdate = DateTime.Now;
else
article.Pubdate = DateTime.Parse(articleDate);
var client = new Index(searchApi);
var result = client.Upsert(article);
itemfound.HttpCode = obj.Code;
if (result)
{
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate);
UpdateItem(itemfound);
}
else
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
UpdateItem(itemfound, tries, IsUrlProcessed);
}
}
}
else
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = true;
itemfound.HttpCode = HttpStatusCode.OK;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
}
}
catch (Exception e)
{
tries = itemfound.UrlMaxTries + 1;
IsUrlProcessed = false;
itemfound.DateCreated = DateTime.Parse(articleDate);
itemfound.DateModified = DateTime.Parse(articleDate) == null ? DateTime.Now : DateTime.Parse(articleDate);
}
finally
{
DateTime dtEnd = DateTime.Now;
Console.WriteLine(string.Format("\r Total time taken to process items is {0}", (dtEnd - dtStart).TotalSeconds));
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
Done((int)id);
}
}
所有这些代码都基于 Best multi-thread approach for multiple web requests 这个 link。谁能告诉我如何获得这种方法 运行ning?
我认为问题在于您创建任务的方式:
new Task(id => new Worker().DoWork((int)id, item.Url, token), item.Url, token)
此 Task
构造函数重载预期 Action<object>
委托。这意味着 id
将被键入为 object
并且您需要先将其转换回有用的内容。
Parameters
action
- Type:
System.Action<Object>
- The delegate that represents the code to execute in the task.
state
- Type:
System.Object
- An object representing data to be used by the action.
cancellationToken
- Type:
System.Threading.CancellationToken
-TheCancellationToken
that that the new task will observe.
您决定通过调用 (int)id
将其转换为 int
,但您将 item.Url
作为对象本身传递。我不能 100% 告诉你 Url
的类型是什么,但我不希望 Url
命名的 属性 是 int
.[=29= 类型]
根据@MarcinJuraszek 所说的,我只是回到我的代码并添加了一个 int,因为我找不到另一种方法来解决它。这是我所做的更改
int i=0
foreach (var item in items)
{
urls.Add(item.Url);
WaitingTasks.Enqueue(new Task(id => new Worker().DoWork((string)id, item.Url, token), item.Url, token));
i++;
}