ElasticSearch - 并行调用 UpdateByQuery 和 Update 导致 409 冲突

ElasticSearch - calling UpdateByQuery and Update in parallel causes 409 conflicts

使用大型索引(100,000 个文档),我有一个用例生成多个尝试并行更新文档的线程,源代码使用两种方法更新文档:UpdateUpdateByQuery,所以有些线程调用 Update,有些线程调用 UpdateByQuery。 为了简洁起见,每个线程都尝试为整个文档更新相同的 属性。

这是一个演示用例的小型 POC:

索引 100,000 个 Product 类型的文档,并产生 100 个任务,因此每个任务并行调用 UpdateUpdateByQuery。他们都使用 MatchAll 查询。

public async Task ConflictsTestAsync()
{
    // index 100,000 documents.
    IEnumerable<Product> products = CreateProducts(100000);
    await _client.IndexManyAsync(products);

    await _client.Indices.UpdateSettingsAsync("MyIndex", s => s
        .IndexSettings(i => i.Setting(UpdatableIndexSettings.MaxResultWindow, 100000)));

    var searchResponse = _client.Search<Product>(s => s
        .From(0)
        .Size(100000)
        .Query(q => q.MatchAll())
    );
    IReadOnlyCollection<IHit<Product>> getProducts = searchResponse.Hits;

    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    {
        tasks.Add(UpdateByQuery(getProducts));
        tasks.Add(Update(getProducts));
    }

    await Task.WhenAll(tasks);
}

public class Product
{
    public string Price { get; set; }
    public string Id { get; set; }
}

UpdateByQuery(更新价格为0):

private async Task UpdateByQuery()
{
    Func<UpdateByQueryDescriptor<Product>, IUpdateByQueryRequest> updateByQuerySelector = (UpdateByQueryDescriptor<Product> updateByQueryDescriptor) =>
    {
        updateByQueryDescriptor
            .Conflicts(Conflicts.Abort)
            .ErrorTrace()
            .Query(x => x.MatchAll())
            .Script(x => x.Source("ctx._source['price'] = '0'"));               

        IUpdateByQueryRequest result = updateByQueryDescriptor;
        return result;
    };

    await _client.UpdateByQueryAsync(updateByQuerySelector, CancellationToken.None);
}

更新(更新价格为1):

private async Task Update(IReadOnlyCollection<IHit<Product>> keys)
{
    foreach (IHit<Product> product in keys)
    {
        DocumentPath<Product> id = new DocumentPath<Product>(product.Id);
        Func<UpdateDescriptor<Product, Product>, IUpdateRequest<Product, Product>> updateSelector = (UpdateDescriptor<Product, Product> updateDescriptor) =>
        {
            var page = product.Source;
            page.Price = "1";

            updateDescriptor.Doc(page);

            IUpdateRequest<Product, Product> result = updateDescriptor;

            return result;
        };

        await _client.UpdateAsync<Product>(id, updateSelector);
    }
}

问题是产生多个查询线程和 UpdateByQuery 导致冲突异常:

一些 Update 抛出:

Invalid NEST response built from a unsuccessful (409) low level call on POST: /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c Audit trail of this API call:

  • [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.2495465 OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 409 from: POST /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c. ServerError: Type: version_conflict_engine_exception Reason: "[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]" Request: {"doc":{"id":"d75a34ae-2533-4e15-a852-13e98c5b599c","manufacturer":"777e1602-8390-40c8-817e-fdef4e3fb9c0","price":"1","title":"31184e90-f1d1-45be-8746-496a50de2f97","description":"780cc1ab-0a8b-4114-a840-67a528de8e55"}} Response: {"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"}],"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"},"status":409}

还有一些 UpdateByQuery 抛出(我已经清理了失败数组):

Invalid NEST response built from a unsuccessful (409) low level call on POST: /MyIndex/_update_by_query?conflicts=abort&error_trace=true Audit trail of this API call:

  • [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.2636546 OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 409 from: POST /MyIndex/_update_by_query?conflicts=abort&error_trace=true Request: {"query":{"match_all":{}},"script":{"source":"ctx._source['price'] = '0'"}} Response: {"took":120,"timed_out":false,"total":100000,"updated":0,"deleted":0,"batches":1,"version_conflicts":1000,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"MyIndex","type":"_doc","id":"d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b","cause":{"type":"version_conflict_engine_exception","reason":"[d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [100000] and primary term [1]","index_uuid":"pPDFKhj6T4y-MpzYECKpxQ","shard":"0"]}

因为性能很重要,所以我不想放弃 UpdateByQuery(并且只使用 Update)或互斥方法访问,任何处理这种情况的建议都将不胜感激。

更新: 数据完整性很重要:.Conflicts(Conflicts.Abort)

弹性搜索:7.10.0.

长话短说: 如果您希望它即使在发生冲突时也能继续工作,您可以将 conflicts=proceed 传递给 update_by_query API。

更多详情: update_by_query page 说明:

When you submit an update by query request, Elasticsearch gets a snapshot of the data stream or index when it begins processing the request and updates matching documents using internal versioning. When the versions match, the document is updated and the version number is incremented. If a document changes between the time that the snapshot is taken and the update operation is processed, it results in a version conflict and the operation fails. You can opt to count version conflicts instead of halting and returning by setting conflicts to proceed.

所以基本上,您的 updateupdate_by_query 正在尝试更新相同的文档,相互冲突。使用 conflicts=proceed 使该操作说“哦,好吧,我将继续更新其他文档”。