lucene.net IndexWriter 和 Azure WebJob

lucene.net IndexWriter and Azure WebJob

我有一个 Azure webjob 运行 连续触发基于队列触发器。该队列包含需要写入我的 lucene 索引的项目列表。我目前有很多项目在队列中(超过 50 万个项目),我正在寻找最有效的方式来处理它。当我尝试 'scale' 退出网络作业时,我不断收到 IndexWriter Lock 异常。

当前设置:

JobHostConfiguration config = new JobHostConfiguration();
            config.Queues.BatchSize = 1;

            var host = new JobHost(config);                        
            host.RunAndBlock();

网页作业功能

     public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
                {
                    var azureDirectory = new AzureDirectory(CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings["StorageConnectionString"].ConnectionString), "megadata");
                    var findexExists = IndexReader.IndexExists(azureDirectory);
                    var count = items.Count;
                    IndexWriter indexWriter = null;
                    int errors = 0;
                    while (indexWriter == null && errors < 10)
                    {
                        try
                        {
                            indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
                        }
                        catch (LockObtainFailedException)
                        {
                            log.WriteLine("Lock is taken, Hit 'Y' to clear the lock, or anything else to try again");
                            errors++;
                        }
                    };
                    if (errors >= 10)
                    {
                        azureDirectory.ClearLock("write.lock");
                        indexWriter = new IndexWriter(azureDirectory, new StandardAnalyzer(Lucene.Net.Util.Version.LUCENE_30), !IndexReader.IndexExists(azureDirectory), new Lucene.Net.Index.IndexWriter.MaxFieldLength(IndexWriter.DEFAULT_MAX_FIELD_LENGTH));
 log.WriteLine("IndexWriter lock obtained, this process has exclusive write access to index");
            indexWriter.SetRAMBufferSizeMB(10.0);
            // Parallel.ForEach(items, (itm) =>
            //{
            foreach (var itm in items)
            {
                AddtoIndex(itm, indexWriter);
            }
            //});
    }

更新索引项的方法基本上是这样的:

private static void AddtoIndex(ListingItem item, IndexWriter indexWriter)
        {            
            var doc = new Document();
            doc.Add(new Field("id", item.URL, Field.Store.NO, Field.Index.NOT_ANALYZED, Field.TermVector.NO));
            var title = new Field("Title", item.Title, Field.Store.YES, Field.Index.ANALYZED, Field.TermVector.YES);
 indexWriter.UpdateDocument(new Term("id", item.URL), doc);
}

我尝试过的事情:

  1. 将 azure config 批大小设置为最大 32
  2. 使方法异步并使用 Task.WhenAll
  3. 使用并行for循环

当我尝试上述操作时,它通常会失败:

Lucene.Net.Store.LockObtainFailedException: Lucene.Net.Store.LockObtainFailedException: Lock obtain timed out: AzureLock@write.lock.
 at Lucene.Net.Store.Lock.Obtain(Int64 lockWaitTimeout) in d:\Lucene.Net\FullRepo\trunk\src\core\Store\Lock.cs:line 97
 at Lucene.Net.Index.IndexWriter.Init(Directory d, Analyzer

关于我如何在架构上设置此 Web 作业以便它可以处理队列中的更多项目而不是一个接一个地做,有什么建议吗?他们需要写入同一个索引? 谢谢

当多个进程试图同时写入 Lucene 索引时,您 运行 遇到了 Lucene 语义问题。缩放 azure 应用程序,使用任务或并行 for 循环只会导致问题,因为当时只有一个进程应该写入 Lucene 索引。

建筑这才是你应该做的

  • 确保任何时候只有一个 webjobs 实例是 运行 – 即使 如果网络应用程序缩放(例如通过自动缩放)
  • 使用最大 webjob 批处理大小 (32)
  • 在每批之后提交 Lucene 索引以最小化 I/O

通过将 settings.job 文件添加到 webjob 项目来确保只有一个 webjob 实例。将构建操作设置为内容并复制到输出目录。将以下 JSON 添加到文件

{ "is_singleton": true }

将 webjob 批处理站点配置为最大值

JobHostConfiguration config = new JobHostConfiguration();
config.Queues.BatchSize = 1;
var host = new JobHost(config);                        
host.RunAndBlock();

在每批之后提交 Lucene 索引

public static void AddToSearchIndex([QueueTrigger("indexsearchadd")] List<ListingItem> items, TextWriter log)
{
    ...
    indexWriter = new IndexWriter(azureDirectory, …);

    foreach (var itm in items)
    {
        AddtoIndex(itm, indexWriter);
    }
    indexWriter.Commit();
}

这只会在提交 Lucene 索引时写入存储帐户,从而加快索引过程。此外,webjob 批处理还将加快消息处理速度(随时间处理的消息数,而不是单个消息处理时间)。

您可以添加检查以查看 Lucene 索引是否已锁定(write.lock 文件存在)并在批处理开始时解锁索引。这不应该发生,但一切都可能发生,所以我会添加它以确保。

您可以通过使用更大的 Web 应用程序实例(里程可能会有所不同)并使用更快的存储(如 Azure 高级存储)来进一步加快索引过程。

您可以阅读有关 internals of Lucene indexes on Azure on my blog 的更多信息。