lucene.net IndexWriter 和 Azure WebJob
lucene.net IndexWriter and Azure WebJob
我有一个 Azure webjob 运行 连续触发基于队列触发器。该队列包含需要写入我的 lucene 索引的项目列表。我目前有很多项目在队列中(超过 50 万个项目),我正在寻找最有效的方式来处理它。当我尝试 'scale' 退出网络作业时,我不断收到 IndexWriter Lock 异常。
当前设置:
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
网页作业功能
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
var azureDirectory = new AzureDirectory(CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString), "megadata");
var findexExists = IndexReader.IndexExists(azureDirectory);
var count = items.Count;
IndexWriter indexWriter = null;
int errors = 0;
while (indexWriter == null && errors < 10)
{
try
{
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
}
catch (LockObtainFailedException)
{
log.WriteLine("Lock is taken, Hit 'Y' to clear the lock, or anything else to try again");
errors++;
}
};
if (errors >= 10)
{
azureDirectory.ClearLock("write.lock");
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
log.WriteLine("IndexWriter lock obtained, this process has exclusive write access to index");
indexWriter.SetRAMBufferSizeMB(10.0);
// Parallel.ForEach(items, (itm) =>
//{
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
//});
}
更新索引项的方法基本上是这样的:
private static void AddtoIndex(ListingItem item, IndexWriter indexWriter)
{
var doc = new Document();
doc.Add(new Field("id", item.URL, Field.Store.NO, Field.Index.NOT_ANALYZED, Field.TermVector.NO));
var title = new Field("Title", item.Title, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.YES);
indexWriter.UpdateDocument(new Term("id", item.URL), doc);
}
我尝试过的事情:
- 将 azure config 批大小设置为最大 32
- 使方法异步并使用 Task.WhenAll
- 使用并行for循环
当我尝试上述操作时,它通常会失败:
Lucene.Net.Store.LockObtainFailedException: Lucene.Net.Store.LockObtainFailedException: Lock obtain timed out: AzureLock@write.lock.
at Lucene.Net.Store.Lock.Obtain(Int64 lockWaitTimeout) in d:\Lucene.Net\FullRepo\trunk\src\core\Store\Lock.cs:line 97
at Lucene.Net.Index.IndexWriter.Init(Directory d, Analyzer
关于我如何在架构上设置此 Web 作业以便它可以处理队列中的更多项目而不是一个接一个地做,有什么建议吗?他们需要写入同一个索引?
谢谢
当多个进程试图同时写入 Lucene 索引时,您 运行 遇到了 Lucene 语义问题。缩放 azure 应用程序,使用任务或并行 for 循环只会导致问题,因为当时只有一个进程应该写入 Lucene 索引。
建筑这才是你应该做的
- 确保任何时候只有一个 webjobs 实例是 运行 – 即使
如果网络应用程序缩放(例如通过自动缩放)
- 使用最大 webjob 批处理大小 (32)
- 在每批之后提交 Lucene 索引以最小化 I/O
通过将 settings.job 文件添加到 webjob 项目来确保只有一个 webjob 实例。将构建操作设置为内容并复制到输出目录。将以下 JSON 添加到文件
{ "is_singleton": true }
将 webjob 批处理站点配置为最大值
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
在每批之后提交 Lucene 索引
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
...
indexWriter = new IndexWriter(azureDirectory, …);
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
indexWriter.Commit();
}
这只会在提交 Lucene 索引时写入存储帐户,从而加快索引过程。此外,webjob 批处理还将加快消息处理速度(随时间处理的消息数,而不是单个消息处理时间)。
您可以添加检查以查看 Lucene 索引是否已锁定(write.lock 文件存在)并在批处理开始时解锁索引。这不应该发生,但一切都可能发生,所以我会添加它以确保。
您可以通过使用更大的 Web 应用程序实例(里程可能会有所不同)并使用更快的存储(如 Azure 高级存储)来进一步加快索引过程。
您可以阅读有关 internals of Lucene indexes on Azure on my blog 的更多信息。
我有一个 Azure webjob 运行 连续触发基于队列触发器。该队列包含需要写入我的 lucene 索引的项目列表。我目前有很多项目在队列中(超过 50 万个项目),我正在寻找最有效的方式来处理它。当我尝试 'scale' 退出网络作业时,我不断收到 IndexWriter Lock 异常。
当前设置:
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
网页作业功能
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
var azureDirectory = new AzureDirectory(CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString), "megadata");
var findexExists = IndexReader.IndexExists(azureDirectory);
var count = items.Count;
IndexWriter indexWriter = null;
int errors = 0;
while (indexWriter == null && errors < 10)
{
try
{
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
}
catch (LockObtainFailedException)
{
log.WriteLine("Lock is taken, Hit 'Y' to clear the lock, or anything else to try again");
errors++;
}
};
if (errors >= 10)
{
azureDirectory.ClearLock("write.lock");
indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
log.WriteLine("IndexWriter lock obtained, this process has exclusive write access to index");
indexWriter.SetRAMBufferSizeMB(10.0);
// Parallel.ForEach(items, (itm) =>
//{
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
//});
}
更新索引项的方法基本上是这样的:
private static void AddtoIndex(ListingItem item, IndexWriter indexWriter)
{
var doc = new Document();
doc.Add(new Field("id", item.URL, Field.Store.NO, Field.Index.NOT_ANALYZED, Field.TermVector.NO));
var title = new Field("Title", item.Title, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.YES);
indexWriter.UpdateDocument(new Term("id", item.URL), doc);
}
我尝试过的事情:
- 将 azure config 批大小设置为最大 32
- 使方法异步并使用 Task.WhenAll
- 使用并行for循环
当我尝试上述操作时,它通常会失败:
Lucene.Net.Store.LockObtainFailedException: Lucene.Net.Store.LockObtainFailedException: Lock obtain timed out: AzureLock@write.lock.
at Lucene.Net.Store.Lock.Obtain(Int64 lockWaitTimeout) in d:\Lucene.Net\FullRepo\trunk\src\core\Store\Lock.cs:line 97
at Lucene.Net.Index.IndexWriter.Init(Directory d, Analyzer
关于我如何在架构上设置此 Web 作业以便它可以处理队列中的更多项目而不是一个接一个地做,有什么建议吗?他们需要写入同一个索引? 谢谢
当多个进程试图同时写入 Lucene 索引时,您 运行 遇到了 Lucene 语义问题。缩放 azure 应用程序,使用任务或并行 for 循环只会导致问题,因为当时只有一个进程应该写入 Lucene 索引。
建筑这才是你应该做的
- 确保任何时候只有一个 webjobs 实例是 运行 – 即使 如果网络应用程序缩放(例如通过自动缩放)
- 使用最大 webjob 批处理大小 (32)
- 在每批之后提交 Lucene 索引以最小化 I/O
通过将 settings.job 文件添加到 webjob 项目来确保只有一个 webjob 实例。将构建操作设置为内容并复制到输出目录。将以下 JSON 添加到文件
{ "is_singleton": true }
将 webjob 批处理站点配置为最大值
JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);
host.RunAndBlock();
在每批之后提交 Lucene 索引
public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
...
indexWriter = new IndexWriter(azureDirectory, …);
foreach (var itm in items)
{
AddtoIndex(itm, indexWriter);
}
indexWriter.Commit();
}
这只会在提交 Lucene 索引时写入存储帐户,从而加快索引过程。此外,webjob 批处理还将加快消息处理速度(随时间处理的消息数,而不是单个消息处理时间)。
您可以添加检查以查看 Lucene 索引是否已锁定(write.lock 文件存在)并在批处理开始时解锁索引。这不应该发生,但一切都可能发生,所以我会添加它以确保。
您可以通过使用更大的 Web 应用程序实例(里程可能会有所不同)并使用更快的存储(如 Azure 高级存储)来进一步加快索引过程。
您可以阅读有关 internals of Lucene indexes on Azure on my blog 的更多信息。