NEST ElasticClient C# 批量插入集合
NEST ElasticClient C# bulk insert collection
我正在尝试使用 NEST ElasticClient 库将一组数据批量插入 Elastic Search。
var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
var settings = new ConnectionSettings(pool);
var client = new ElasticClient(settings);
var path = @"D:\data.xml";
var serializer = new XmlSerializer(typeof(Entity));
using (TextReader reader = new StreamReader(new FileStream(path, FileMode.Open))) {
var collection = (Entity)serializer.Deserialize(reader);
var elasticSearchEntities = new List<ElasticSearchEntity>();
for (int i = 0; i < collection.Entity.Length; i++)
{
var elasticSearchEntity = new ElasticSearchEntity
{
_index = "entity",
_type = "entity",
_id = collection.Entity[i].id.ToString(),
Entity = collection.Entity[i],
};
elasticSearchEntities.Add(elasticSearchEntity);
}
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
}
我在下面这一行设置了一个断点,elasticSearchEntities
对象中有数据。
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
但是运行后没有创建索引
如果我使用 Index
,它可以工作,但速度很慢,因为它会一个接一个地插入。如果可能,我需要批量插入。
批量输入的构造 API 对于低级客户端来说看起来不正确。每个批量操作应包含两个对象
- 表示要执行的批量操作的对象,例如索引和关联的元数据
- 表示文档的对象
看起来问题中的示例将这两个组合到一个对象中,这可能会导致错误 - 批量响应将包含更多详细信息。
如评论中所问,您特别使用低级别客户端是否有原因?高级客户端中有一个批量可观察助手,可以帮助索引大量文档,如果这些文档来自文件或数据库等其他来源,这将很有用。
例如,indexing all questions and answers from Stack Overflow's posts.xml archive
public class Question : Post
{
public string Title { get; set; }
public CompletionField TitleSuggest { get; set; }
public int? AcceptedAnswerId { get; set; }
public int ViewCount { get; set; }
public string LastEditorDisplayName { get; set; }
public List<string> Tags { get; set; }
public int AnswerCount { get; set; }
public int FavoriteCount { get; set; }
public DateTimeOffset? CommunityOwnedDate { get; set; }
public override string Type => nameof(Question);
}
public class Answer : Post
{
public override string Type => nameof(Answer);
}
public class Post
{
public int Id { get; set; }
public JoinField ParentId { get; set; }
public DateTimeOffset CreationDate { get; set; }
public int Score { get; set; }
public string Body { get; set; }
public int? OwnerUserId { get; set; }
public string OwnerDisplayName { get; set; }
public int? LastEditorUserId { get; set; }
public DateTimeOffset? LastEditDate { get; set; }
public DateTimeOffset? LastActivityDate { get; set; }
public int CommentCount { get; set; }
public virtual string Type { get; }
}
void Main()
{
var indexName = "posts";
var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(node)
.RequestTimeout(TimeSpan.FromMinutes(10))
.DefaultMappingFor(new ClrTypeMapping[] {
new ClrTypeMapping(typeof(Post)) { IndexName = indexName },
new ClrTypeMapping(typeof(Question)) { IndexName = indexName, RelationName = "question" },
new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },
})
.OnRequestCompleted(response =>
{
if (response.Success)
Console.WriteLine($"Status: {response.HttpStatusCode}");
else
Console.WriteLine($"Error: {response.DebugInformation}");
});
var client = new ElasticClient(settings);
var characterFilterMappings = CreateCharacterFilterMappings();
if (!client.Indices.Exists(indexName).Exists)
{
var createIndexResponse = client.Indices.Create(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
.Analysis(a => a
.CharFilters(cf => cf
.Mapping("programming_language", mca => mca
.Mappings(characterFilterMappings)
)
)
.Analyzers(an => an
.Custom("html", ca => ca
.CharFilters("html_strip", "programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
.Custom("expand", ca => ca
.CharFilters("programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
)
)
)
.Map<Post>(u => u
.RoutingField(r => r.Required())
.AutoMap<Question>()
.AutoMap<Answer>()
.SourceField(s => s
.Excludes(new[] { "titleSuggest" })
)
.Properties<Question>(p => p
.Join(j => j
.Name(f => f.ParentId)
.Relations(r => r
.Join<Question, Answer>()
)
)
.Text(s => s
.Name(n => n.Title)
.Analyzer("expand")
.Norms(false)
.Fields(f => f
.Keyword(ss => ss
.Name("raw")
)
)
)
.Keyword(s => s
.Name(n => n.OwnerDisplayName)
)
.Keyword(s => s
.Name(n => n.LastEditorDisplayName)
)
.Keyword(s => s
.Name(n => n.Tags)
)
.Keyword(s => s
.Name(n => n.Type)
)
.Text(s => s
.Name(n => n.Body)
.Analyzer("html")
.SearchAnalyzer("expand")
)
.Completion(co => co
.Name(n => n.TitleSuggest)
)
)
)
);
if (!createIndexResponse.IsValid)
Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
}
var seenPages = 0;
var handle = new ManualResetEvent(false);
var size = 1000;
var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(), f => f
.MaxDegreeOfParallelism(16)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.BufferToBulk((bulk, posts) =>
{
foreach (var post in posts)
{
if (post is Question question)
{
var item = new BulkIndexOperation<Question>(question);
bulk.AddOperation(item);
}
else
{
var answer = (Answer)post;
var item = new BulkIndexOperation<Answer>(answer);
bulk.AddOperation(item);
}
}
})
.RefreshOnCompleted()
.Index(indexName)
);
ExceptionDispatchInfo exception = null;
var bulkObserver = new BulkAllObserver(
onError: e =>
{
exception = ExceptionDispatchInfo.Capture(e);
handle.Set();
},
onCompleted: () => handle.Set(),
onNext: b =>
{
Interlocked.Increment(ref seenPages);
Console.WriteLine($"indexed {seenPages} pages");
}
);
observableBulk.Subscribe(bulkObserver);
handle.WaitOne();
if (exception != null)
exception.Throw();
}
public IEnumerable<Post> GetQuestionsAndAnswers()
{
using (var stream = File.OpenRead(@"Whosebug_data\Posts.xml"))
using (var reader = XmlReader.Create(stream))
{
reader.ReadToDescendant("posts");
reader.ReadToDescendant("row");
do
{
var item = (XElement)XNode.ReadFrom(reader);
var id = int.Parse(item.Attribute("Id").Value);
var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
var score = int.Parse(item.Attribute("Score").Value);
var body = item.Attribute("Body")?.Value;
var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
var commentCount = int.Parse(item.Attribute("CommentCount").Value);
var ownerUserId = item.Attribute("OwnerUserId") != null
? int.Parse(item.Attribute("OwnerUserId").Value)
: (int?)null;
var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
var lastEditorUserId = item.Attribute("LastEditorUserId") != null
? int.Parse(item.Attribute("LastEditorUserId").Value)
: (int?)null;
var lastEditDate = item.Attribute("LastEditDate") != null
? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
: (DateTimeOffset?)null;
var lastActivityDate = item.Attribute("LastActivityDate") != null
? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
: (DateTimeOffset?)null;
switch (postTypeId)
{
case 1:
var title = item.Attribute("Title")?.Value;
var question = new Question
{
Id = id,
ParentId = JoinField.Root<Question>(),
AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
? int.Parse(item.Attribute("AcceptedAnswerId").Value)
: (int?)null,
CreationDate = creationDate,
Score = score,
ViewCount = int.Parse(item.Attribute("ViewCount").Value),
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
Title = title,
TitleSuggest = new CompletionField
{
Input = new[] { title },
Weight = score < 0 ? 0 : score
},
Tags = item.Attribute("Tags") != null
? item.Attribute("Tags").Value.Replace("<", string.Empty)
.Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries)
.ToList()
: null,
AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),
CommentCount = commentCount,
FavoriteCount = item.Attribute("FavoriteCount") != null
? int.Parse(item.Attribute("FavoriteCount").Value)
: 0,
CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
: (DateTimeOffset?)null
};
yield return question;
break;
case 2:
var answer = new Answer
{
Id = id,
ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),
CreationDate = creationDate,
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
CommentCount = commentCount,
};
yield return answer;
break;
}
}
while (reader.ReadToNextSibling("row"));
}
}
/*
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp, C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
var mappings = new List<string>();
foreach (var c in new[] { "c", "f", "m", "j", "s", "a", "k", "t" })
{
mappings.Add($"{c}# => {c}sharp");
mappings.Add($"{c.ToUpper()}# => {c}sharp");
}
foreach (var c in new[] { "g", "m", "c", "s", "a", "d" })
{
mappings.Add($"{c}++ => {c}plusplus");
mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
}
return mappings;
}
IEnumerable<Post> GetQuestionsAndAnswers()
从大型 posts.xml 文件(如果我记得大小约为 50GB)中产生问题和答案,将这些提供给 BulkAll
,这将同时构成多达 16 个批量一次向 Elasticsearch 发出请求,其中每个批量请求索引 1000 个文档。 See this GitHub repository for a more comprehensive example.
我正在尝试使用 NEST ElasticClient 库将一组数据批量插入 Elastic Search。
var pool = new SingleNodeConnectionPool(new Uri($"http://localhost:9200"));
var settings = new ConnectionSettings(pool);
var client = new ElasticClient(settings);
var path = @"D:\data.xml";
var serializer = new XmlSerializer(typeof(Entity));
using (TextReader reader = new StreamReader(new FileStream(path, FileMode.Open))) {
var collection = (Entity)serializer.Deserialize(reader);
var elasticSearchEntities = new List<ElasticSearchEntity>();
for (int i = 0; i < collection.Entity.Length; i++)
{
var elasticSearchEntity = new ElasticSearchEntity
{
_index = "entity",
_type = "entity",
_id = collection.Entity[i].id.ToString(),
Entity = collection.Entity[i],
};
elasticSearchEntities.Add(elasticSearchEntity);
}
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
}
我在下面这一行设置了一个断点,elasticSearchEntities
对象中有数据。
var indexResponse = client.LowLevel.Bulk<StringResponse>(PostData.MultiJson(elasticSearchEntities));
但是运行后没有创建索引
如果我使用 Index
,它可以工作,但速度很慢,因为它会一个接一个地插入。如果可能,我需要批量插入。
批量输入的构造 API 对于低级客户端来说看起来不正确。每个批量操作应包含两个对象
- 表示要执行的批量操作的对象,例如索引和关联的元数据
- 表示文档的对象
看起来问题中的示例将这两个组合到一个对象中,这可能会导致错误 - 批量响应将包含更多详细信息。
如评论中所问,您特别使用低级别客户端是否有原因?高级客户端中有一个批量可观察助手,可以帮助索引大量文档,如果这些文档来自文件或数据库等其他来源,这将很有用。
例如,indexing all questions and answers from Stack Overflow's posts.xml archive
public class Question : Post
{
public string Title { get; set; }
public CompletionField TitleSuggest { get; set; }
public int? AcceptedAnswerId { get; set; }
public int ViewCount { get; set; }
public string LastEditorDisplayName { get; set; }
public List<string> Tags { get; set; }
public int AnswerCount { get; set; }
public int FavoriteCount { get; set; }
public DateTimeOffset? CommunityOwnedDate { get; set; }
public override string Type => nameof(Question);
}
public class Answer : Post
{
public override string Type => nameof(Answer);
}
public class Post
{
public int Id { get; set; }
public JoinField ParentId { get; set; }
public DateTimeOffset CreationDate { get; set; }
public int Score { get; set; }
public string Body { get; set; }
public int? OwnerUserId { get; set; }
public string OwnerDisplayName { get; set; }
public int? LastEditorUserId { get; set; }
public DateTimeOffset? LastEditDate { get; set; }
public DateTimeOffset? LastActivityDate { get; set; }
public int CommentCount { get; set; }
public virtual string Type { get; }
}
void Main()
{
var indexName = "posts";
var node = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(node)
.RequestTimeout(TimeSpan.FromMinutes(10))
.DefaultMappingFor(new ClrTypeMapping[] {
new ClrTypeMapping(typeof(Post)) { IndexName = indexName },
new ClrTypeMapping(typeof(Question)) { IndexName = indexName, RelationName = "question" },
new ClrTypeMapping(typeof(Answer)) { IndexName = indexName },
})
.OnRequestCompleted(response =>
{
if (response.Success)
Console.WriteLine($"Status: {response.HttpStatusCode}");
else
Console.WriteLine($"Error: {response.DebugInformation}");
});
var client = new ElasticClient(settings);
var characterFilterMappings = CreateCharacterFilterMappings();
if (!client.Indices.Exists(indexName).Exists)
{
var createIndexResponse = client.Indices.Create(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
.Analysis(a => a
.CharFilters(cf => cf
.Mapping("programming_language", mca => mca
.Mappings(characterFilterMappings)
)
)
.Analyzers(an => an
.Custom("html", ca => ca
.CharFilters("html_strip", "programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
.Custom("expand", ca => ca
.CharFilters("programming_language")
.Tokenizer("standard")
.Filters("standard", "lowercase", "stop")
)
)
)
)
.Map<Post>(u => u
.RoutingField(r => r.Required())
.AutoMap<Question>()
.AutoMap<Answer>()
.SourceField(s => s
.Excludes(new[] { "titleSuggest" })
)
.Properties<Question>(p => p
.Join(j => j
.Name(f => f.ParentId)
.Relations(r => r
.Join<Question, Answer>()
)
)
.Text(s => s
.Name(n => n.Title)
.Analyzer("expand")
.Norms(false)
.Fields(f => f
.Keyword(ss => ss
.Name("raw")
)
)
)
.Keyword(s => s
.Name(n => n.OwnerDisplayName)
)
.Keyword(s => s
.Name(n => n.LastEditorDisplayName)
)
.Keyword(s => s
.Name(n => n.Tags)
)
.Keyword(s => s
.Name(n => n.Type)
)
.Text(s => s
.Name(n => n.Body)
.Analyzer("html")
.SearchAnalyzer("expand")
)
.Completion(co => co
.Name(n => n.TitleSuggest)
)
)
)
);
if (!createIndexResponse.IsValid)
Console.WriteLine($"invalid response creating index. {createIndexResponse.DebugInformation}");
}
var seenPages = 0;
var handle = new ManualResetEvent(false);
var size = 1000;
var observableBulk = client.BulkAll<Post>(GetQuestionsAndAnswers(), f => f
.MaxDegreeOfParallelism(16)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.BufferToBulk((bulk, posts) =>
{
foreach (var post in posts)
{
if (post is Question question)
{
var item = new BulkIndexOperation<Question>(question);
bulk.AddOperation(item);
}
else
{
var answer = (Answer)post;
var item = new BulkIndexOperation<Answer>(answer);
bulk.AddOperation(item);
}
}
})
.RefreshOnCompleted()
.Index(indexName)
);
ExceptionDispatchInfo exception = null;
var bulkObserver = new BulkAllObserver(
onError: e =>
{
exception = ExceptionDispatchInfo.Capture(e);
handle.Set();
},
onCompleted: () => handle.Set(),
onNext: b =>
{
Interlocked.Increment(ref seenPages);
Console.WriteLine($"indexed {seenPages} pages");
}
);
observableBulk.Subscribe(bulkObserver);
handle.WaitOne();
if (exception != null)
exception.Throw();
}
public IEnumerable<Post> GetQuestionsAndAnswers()
{
using (var stream = File.OpenRead(@"Whosebug_data\Posts.xml"))
using (var reader = XmlReader.Create(stream))
{
reader.ReadToDescendant("posts");
reader.ReadToDescendant("row");
do
{
var item = (XElement)XNode.ReadFrom(reader);
var id = int.Parse(item.Attribute("Id").Value);
var postTypeId = int.Parse(item.Attribute("PostTypeId").Value);
var score = int.Parse(item.Attribute("Score").Value);
var body = item.Attribute("Body")?.Value;
var creationDate = DateTimeOffset.Parse(item.Attribute("CreationDate").Value);
var commentCount = int.Parse(item.Attribute("CommentCount").Value);
var ownerUserId = item.Attribute("OwnerUserId") != null
? int.Parse(item.Attribute("OwnerUserId").Value)
: (int?)null;
var ownerDisplayName = item.Attribute("OwnerDisplayName")?.Value;
var lastEditorUserId = item.Attribute("LastEditorUserId") != null
? int.Parse(item.Attribute("LastEditorUserId").Value)
: (int?)null;
var lastEditDate = item.Attribute("LastEditDate") != null
? DateTimeOffset.Parse(item.Attribute("LastEditDate").Value)
: (DateTimeOffset?)null;
var lastActivityDate = item.Attribute("LastActivityDate") != null
? DateTimeOffset.Parse(item.Attribute("LastActivityDate").Value)
: (DateTimeOffset?)null;
switch (postTypeId)
{
case 1:
var title = item.Attribute("Title")?.Value;
var question = new Question
{
Id = id,
ParentId = JoinField.Root<Question>(),
AcceptedAnswerId = item.Attribute("AcceptedAnswerId") != null
? int.Parse(item.Attribute("AcceptedAnswerId").Value)
: (int?)null,
CreationDate = creationDate,
Score = score,
ViewCount = int.Parse(item.Attribute("ViewCount").Value),
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditorDisplayName = item.Attribute("LastEditorDisplayName")?.Value,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
Title = title,
TitleSuggest = new CompletionField
{
Input = new[] { title },
Weight = score < 0 ? 0 : score
},
Tags = item.Attribute("Tags") != null
? item.Attribute("Tags").Value.Replace("<", string.Empty)
.Split(new[] { ">" }, StringSplitOptions.RemoveEmptyEntries)
.ToList()
: null,
AnswerCount = int.Parse(item.Attribute("AnswerCount").Value),
CommentCount = commentCount,
FavoriteCount = item.Attribute("FavoriteCount") != null
? int.Parse(item.Attribute("FavoriteCount").Value)
: 0,
CommunityOwnedDate = item.Attribute("CommunityOwnedDate") != null
? DateTimeOffset.Parse(item.Attribute("CommunityOwnedDate").Value)
: (DateTimeOffset?)null
};
yield return question;
break;
case 2:
var answer = new Answer
{
Id = id,
ParentId = JoinField.Link<Answer>(int.Parse(item.Attribute("ParentId").Value)),
CreationDate = creationDate,
Body = body,
OwnerUserId = ownerUserId,
OwnerDisplayName = ownerDisplayName,
LastEditorUserId = lastEditorUserId,
LastEditDate = lastEditDate,
LastActivityDate = lastActivityDate,
CommentCount = commentCount,
};
yield return answer;
break;
}
}
while (reader.ReadToNextSibling("row"));
}
}
/*
* Simple char filter mappings to transform common
* programming languages in symbols to words
* e.g. c# => csharp, C++ => cplusplus
*/
private IList<string> CreateCharacterFilterMappings()
{
var mappings = new List<string>();
foreach (var c in new[] { "c", "f", "m", "j", "s", "a", "k", "t" })
{
mappings.Add($"{c}# => {c}sharp");
mappings.Add($"{c.ToUpper()}# => {c}sharp");
}
foreach (var c in new[] { "g", "m", "c", "s", "a", "d" })
{
mappings.Add($"{c}++ => {c}plusplus");
mappings.Add($"{c.ToUpper()}++ => {c}plusplus");
}
return mappings;
}
IEnumerable<Post> GetQuestionsAndAnswers()
从大型 posts.xml 文件(如果我记得大小约为 50GB)中产生问题和答案,将这些提供给 BulkAll
,这将同时构成多达 16 个批量一次向 Elasticsearch 发出请求,其中每个批量请求索引 1000 个文档。 See this GitHub repository for a more comprehensive example.