NEST ElasticClient C# 批量插入集合

NEST ElasticClient C# bulk insert collection

我正在尝试使用 NEST ElasticClient 库将一组数据批量插入 Elastic Search。

    var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
    var settings = new ConnectionSettings(pool);

    var client = new ElasticClient(settings);

    var path = @"D:\data.xml";

    var serializer = new XmlSerializer(typeof(Entity));

    using (TextReader reader = new StreamReader(new FileStream(path, FileMode.Open))) {
        var collection = (Entity)serializer.Deserialize(reader);             

        var elasticSearchEntities = new List<ElasticSearchEntity>();
        
        for (int i = 0; i < collection.Entity.Length; i++)
        {
            var elasticSearchEntity = new ElasticSearchEntity
            {
                _index = "entity",
                 _type = "entity",
                _id = collection.Entity[i].id.ToString(),
                Entity = collection.Entity[i],
            };
            elasticSearchEntities.Add(elasticSearchEntity);
     
        }
        var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
    }

我在下面这一行设置了一个断点,elasticSearchEntities 对象中有数据。

var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));

但是运行后没有创建索引

如果我使用 Index,它可以工作,但速度很慢,因为它会一个接一个地插入。如果可能,我需要批量插入。

批量输入的构造 API 对于低级客户端来说看起来不正确。每个批量操作应包含两个对象

  1. 表示要执行的批量操作的对象,例如索引和关联的元数据
  2. 表示文档的对象

看起来问题中的示例将这两个组合到一个对象中,这可能会导致错误 - 批量响应将包含更多详细信息。

如评论中所问,您特别使用低级别客户端是否有原因?高级客户端中有一个批量可观察助手,可以帮助索引大量文档,如果这些文档来自文件或数据库等其他来源,这将很有用。

例如,indexing all questions and answers from Stack Overflow's posts.xml archive

public class Question : Post
{
    public string Title { get; set; }

    public CompletionField TitleSuggest { get; set; }

    public int? AcceptedAnswerId { get; set; }

    public int ViewCount { get; set; }

    public string LastEditorDisplayName { get; set; }

    public List<string> Tags { get; set; }

    public int AnswerCount { get; set; }

    public int FavoriteCount { get; set; }

    public DateTimeOffset? CommunityOwnedDate { get; set; }

    public override string Type => nameof(Question);
}

public class Answer : Post
{
    public override string Type => nameof(Answer);
}

public class Post
{
    public int Id { get; set; }

    public JoinField ParentId { get; set; }

    public DateTimeOffset CreationDate { get; set; }

    public int Score { get; set; }

    public string Body { get; set; }

    public int? OwnerUserId { get; set; }

    public string OwnerDisplayName { get; set; }

    public int? LastEditorUserId { get; set; }

    public DateTimeOffset? LastEditDate { get; set; }

    public DateTimeOffset? LastActivityDate { get; set; }

    public int CommentCount { get; set; }

    public virtual string Type { get; }
}

void Main()
{
    var indexName = "posts";
    var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var settings = new ConnectionSettings(node)
        .RequestTimeout(TimeSpan.FromMinutes(10))
        .DefaultMappingFor(new ClrTypeMapping[] {
            new ClrTypeMapping(typeof(Post)) { IndexName = indexName },
            new ClrTypeMapping(typeof(Question)) { IndexName = indexName, RelationName = "question" },
            new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },
        })
        .OnRequestCompleted(response =>
        {        
            if (response.Success)
                Console.WriteLine($"Status: {response.HttpStatusCode}");
            else
                Console.WriteLine($"Error: {response.DebugInformation}");
        });


    var client = new ElasticClient(settings);  
    var characterFilterMappings = CreateCharacterFilterMappings();

    if (!client.Indices.Exists(indexName).Exists)
    {
        var createIndexResponse = client.Indices.Create(indexName, c => c
            .Settings(s => s
                .NumberOfShards(3)
                .NumberOfReplicas(0)
                .Analysis(a => a
                    .CharFilters(cf => cf
                        .Mapping("programming_language", mca => mca
                            .Mappings(characterFilterMappings)
                        )
                    )
                    .Analyzers(an => an
                        .Custom("html", ca => ca
                            .CharFilters("html_strip", "programming_language")
                            .Tokenizer("standard")
                            .Filters("standard", "lowercase", "stop")
                        )
                        .Custom("expand", ca => ca
                            .CharFilters("programming_language")
                            .Tokenizer("standard")
                            .Filters("standard", "lowercase", "stop")
                        )
                    )
                )
            )
            .Map<Post>(u => u
                .RoutingField(r => r.Required())
                .AutoMap<Question>()
                .AutoMap<Answer>()
                .SourceField(s => s
                    .Excludes(new[] { "titleSuggest" })
                )
                .Properties<Question>(p => p
                    .Join(j => j
                        .Name(f => f.ParentId)
                        .Relations(r => r
                            .Join<Question, Answer>()
                        )
                    )
                    .Text(s => s
                        .Name(n => n.Title)
                        .Analyzer("expand")
                        .Norms(false)
                        .Fields(f => f
                            .Keyword(ss => ss
                                .Name("raw")
                            )
                        )
                    )
                    .Keyword(s => s
                        .Name(n => n.OwnerDisplayName)
                    )
                    .Keyword(s => s
                        .Name(n => n.LastEditorDisplayName)
                    )
                    .Keyword(s => s
                        .Name(n => n.Tags)
                    )
                    .Keyword(s => s
                        .Name(n => n.Type)
                    )
                    .Text(s => s
                        .Name(n => n.Body)
                        .Analyzer("html")
                        .SearchAnalyzer("expand")
                    )
                    .Completion(co => co
                        .Name(n => n.TitleSuggest)
                    )
                )
            )
        );

        if (!createIndexResponse.IsValid)
            Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
    }
    
    var seenPages = 0;
    var handle = new ManualResetEvent(false);
    var size = 1000;

    var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(), f => f
        .MaxDegreeOfParallelism(16)
        .BackOffTime(TimeSpan.FromSeconds(10))
        .BackOffRetries(2)
        .Size(size)
        .BufferToBulk((bulk, posts) =>
        {
            foreach (var post in posts)
            {
                if (post is Question question)
                {
                    var item = new BulkIndexOperation<Question>(question);
                    bulk.AddOperation(item);
                }
                else
                {
                    var answer = (Answer)post;
                    var item = new BulkIndexOperation<Answer>(answer);
                    bulk.AddOperation(item);
                }
            }
        })
        .RefreshOnCompleted()
        .Index(indexName)
    );

    ExceptionDispatchInfo exception = null;

    var bulkObserver = new BulkAllObserver(
        onError: e => 
        { 
            exception = ExceptionDispatchInfo.Capture(e);
            handle.Set();
        },
        onCompleted: () => handle.Set(),
        onNext: b =>
        {
            Interlocked.Increment(ref seenPages);
            Console.WriteLine($"indexed {seenPages} pages");
        }
    );
    
    observableBulk.Subscribe(bulkObserver);   
    handle.WaitOne();
    
    if (exception != null) 
        exception.Throw();
}

public IEnumerable<Post> GetQuestionsAndAnswers()
{
    using (var stream = File.OpenRead(@"Whosebug_data\Posts.xml"))
    using (var reader = XmlReader.Create(stream))
    {
        reader.ReadToDescendant("posts");
        reader.ReadToDescendant("row");
        do
        {
            var item = (XElement)XNode.ReadFrom(reader);
            var id = int.Parse(item.Attribute("Id").Value);
            var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
            var score = int.Parse(item.Attribute("Score").Value);
            var body = item.Attribute("Body")?.Value;
            var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
            var commentCount = int.Parse(item.Attribute("CommentCount").Value);
            var ownerUserId = item.Attribute("OwnerUserId") != null
                ? int.Parse(item.Attribute("OwnerUserId").Value)
                : (int?)null;
            var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
            var lastEditorUserId = item.Attribute("LastEditorUserId") != null
                ? int.Parse(item.Attribute("LastEditorUserId").Value)
                : (int?)null;
            var lastEditDate = item.Attribute("LastEditDate") != null
                ? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
                : (DateTimeOffset?)null;
            var lastActivityDate = item.Attribute("LastActivityDate") != null
                ? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
                : (DateTimeOffset?)null;
                
            switch (postTypeId)
            {
                case 1:
                    var title = item.Attribute("Title")?.Value;
                    
                    var question = new Question
                    {
                        Id = id,
                        ParentId = JoinField.Root<Question>(),
                        AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
                            ? int.Parse(item.Attribute("AcceptedAnswerId").Value)
                            : (int?)null,
                        CreationDate = creationDate,
                        Score = score,
                        ViewCount = int.Parse(item.Attribute("ViewCount").Value),
                        Body = body,
                        OwnerUserId = ownerUserId,
                        OwnerDisplayName = ownerDisplayName,
                        LastEditorUserId = lastEditorUserId,
                        LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,
                        LastEditDate = lastEditDate,
                        LastActivityDate = lastActivityDate,
                        Title = title,
                        TitleSuggest = new CompletionField
                        {
                            Input = new[] { title },
                            Weight = score < 0 ? 0 : score
                        },
                        Tags = item.Attribute("Tags") != null
                            ? item.Attribute("Tags").Value.Replace("<", string.Empty)
                                .Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries)
                                .ToList()
                            : null,
                        AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),
                        CommentCount = commentCount,
                        FavoriteCount = item.Attribute("FavoriteCount") != null
                            ? int.Parse(item.Attribute("FavoriteCount").Value)
                            : 0,
                        CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
                            ? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
                            : (DateTimeOffset?)null
                    };
                    
                    yield return question;
                    break;
                case 2:
                    var answer = new Answer
                    {
                        Id = id,
                        ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),
                        CreationDate = creationDate,
                        Body = body,
                        OwnerUserId = ownerUserId,
                        OwnerDisplayName = ownerDisplayName,
                        LastEditorUserId = lastEditorUserId,
                        LastEditDate = lastEditDate,
                        LastActivityDate = lastActivityDate,
                        CommentCount = commentCount,
                    };
                    
                    yield return answer;
                    break;
            }
        }
        while (reader.ReadToNextSibling("row"));
    }
}

/* 
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp, C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
    var mappings = new List<string>();
    foreach (var c in new[] { "c", "f", "m", "j", "s", "a", "k", "t" })
    {
        mappings.Add($"{c}# => {c}sharp");
        mappings.Add($"{c.ToUpper()}# => {c}sharp");
    }

    foreach (var c in new[] { "g", "m", "c", "s", "a", "d" })
    {
        mappings.Add($"{c}++ => {c}plusplus");
        mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
    }
    
    return mappings;
}

IEnumerable<Post> GetQuestionsAndAnswers() 从大型 posts.xml 文件(如果我记得大小约为 50GB)中产生问题和答案,将这些提供给 BulkAll,这将同时构成多达 16 个批量一次向 Elasticsearch 发出请求,其中每个批量请求索引 1000 个文档。 See this GitHub repository for a more comprehensive example.