如何在索引之前预处理文档?
How to preprocess a document before indexation?
我正在使用 logstash 和 elasticsearch 通过 Twitter 插件收集推文。我的问题是我从 Twitter 收到一个文档,我想在索引我的文档之前进行一些预处理。假设我将此作为来自 Twitter 的文档结果:
{
"tweet": {
"tweetId": 1025,
"tweetContent": "Hey this is a fake document for Whosebug #Whosebug #elasticsearch",
"hashtags": ["Whosebug", "elasticsearch"],
"publishedAt": "2017 23 August",
"analytics": {
"likeNumber": 400,
"shareNumber": 100,
}
},
"author":{
"authorId": 819744,
"authorAt": "the_expert",
"authorName": "John Smith",
"description": "Haha it's a fake description"
}
}
现在,根据 twitter 发送给我的这份文件,我想生成两个文件:
第一个将在 twitter/tweet/1025 中编入索引:
# The id for this document should be the one from tweetId `"tweetId": 1025`
{
"content": "Hey this is a fake document for Whosebug #Whosebug #elasticsearch", # this field has been renamed
"hashtags": ["Whosebug", "elasticsearch"],
"date": "2017/08/23", # the date has been formated
"shareNumber": 100 # This field has been flattened
}
第二个将在 twitter/author/819744 中编入索引:
# The id for this document should be the one from authorId `"authorId": 819744 `
{
"authorAt": "the_expert",
"description": "Haha it's a fake description"
}
我将我的输出定义如下:
output {
stdout { codec => dots }
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
}
}
如何处理来自 Twitter 的信息?
编辑:
所以我的完整配置文件应该如下所示:
input {
twitter {
consumer_key => "consumer_key"
consumer_secret => "consumer_secret"
oauth_token => "access_token"
oauth_token_secret => "access_token_secret"
keywords => [ "random", "word"]
full_tweet => true
type => "tweet"
}
}
filter {
clone {
clones => ["author"]
}
if([type] == "tweet") {
mutate {
remove_field => ["authorId", "authorAt"]
}
} else {
mutate {
remove_field => ["tweetId", "tweetContent"]
}
}
}
output {
stdout { codec => dots }
if [type] == "tweet" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[tweetId]}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "author"
document_id => "%{[authorId]}"
}
}
}
您可以在 logstash 上使用克隆过滤器插件。
使用示例 logstash 配置文件,该文件从 stdin 获取 JSON 输入并仅在 stdout 上显示输出:
input {
stdin {
codec => json
type => "tweet"
}
}
filter {
mutate {
add_field => {
"tweetId" => "%{[tweet][tweetId]}"
"content" => "%{[tweet][tweetContent]}"
"date" => "%{[tweet][publishedAt]}"
"shareNumber" => "%{[tweet][analytics][shareNumber]}"
"authorId" => "%{[author][authorId]}"
"authorAt" => "%{[author][authorAt]}"
"description" => "%{[author][description]}"
}
}
date {
match => ["date", "yyyy dd MMMM"]
target => "date"
}
ruby {
code => '
event.set("hashtags", event.get("[tweet][hashtags]"))
'
}
clone {
clones => ["author"]
}
mutate {
remove_field => ["author", "tweet", "message"]
}
if([type] == "tweet") {
mutate {
remove_field => ["authorId", "authorAt", "description"]
}
} else {
mutate {
remove_field => ["tweetId", "content", "hashtags", "date", "shareNumber"]
}
}
}
output {
stdout {
codec => rubydebug
}
}
用作输入:
{"tweet": { "tweetId": 1025, "tweetContent": "Hey this is a fake document", "hashtags": ["Whosebug", "elasticsearch"], "publishedAt": "2017 23 August","analytics": { "likeNumber": 400, "shareNumber": 100 } }, "author":{ "authorId": 819744, "authorAt": "the_expert", "authorName": "John Smith", "description": "fake description" } }
您将获得这两个文档:
{
"date" => 2017-08-23T00:00:00.000Z,
"hashtags" => [
[0] "Whosebug",
[1] "elasticsearch"
],
"type" => "tweet",
"tweetId" => "1025",
"content" => "Hey this is a fake document",
"shareNumber" => "100",
"@timestamp" => 2017-08-23T20:36:53.795Z,
"@version" => "1",
"host" => "my-host"
}
{
"description" => "fake description",
"type" => "author",
"authorId" => "819744",
"@timestamp" => 2017-08-23T20:36:53.795Z,
"authorAt" => "the_expert",
"@version" => "1",
"host" => "my-host"
}
您也可以使用 ruby 脚本来展平字段,然后在必要时对 mutate 使用重命名。
如果您希望 elasticsearch 使用 authorId 和 tweetId,而不是默认 ID,您可以使用 document_id.
配置 elasticsearch 输出
output {
stdout { codec => dots }
if [type] == "tweet" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[tweetId]}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[authorId]}"
}
}
}
我正在使用 logstash 和 elasticsearch 通过 Twitter 插件收集推文。我的问题是我从 Twitter 收到一个文档,我想在索引我的文档之前进行一些预处理。假设我将此作为来自 Twitter 的文档结果:
{
"tweet": {
"tweetId": 1025,
"tweetContent": "Hey this is a fake document for Whosebug #Whosebug #elasticsearch",
"hashtags": ["Whosebug", "elasticsearch"],
"publishedAt": "2017 23 August",
"analytics": {
"likeNumber": 400,
"shareNumber": 100,
}
},
"author":{
"authorId": 819744,
"authorAt": "the_expert",
"authorName": "John Smith",
"description": "Haha it's a fake description"
}
}
现在,根据 twitter 发送给我的这份文件,我想生成两个文件: 第一个将在 twitter/tweet/1025 中编入索引:
# The id for this document should be the one from tweetId `"tweetId": 1025`
{
"content": "Hey this is a fake document for Whosebug #Whosebug #elasticsearch", # this field has been renamed
"hashtags": ["Whosebug", "elasticsearch"],
"date": "2017/08/23", # the date has been formated
"shareNumber": 100 # This field has been flattened
}
第二个将在 twitter/author/819744 中编入索引:
# The id for this document should be the one from authorId `"authorId": 819744 `
{
"authorAt": "the_expert",
"description": "Haha it's a fake description"
}
我将我的输出定义如下:
output {
stdout { codec => dots }
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
}
}
如何处理来自 Twitter 的信息?
编辑:
所以我的完整配置文件应该如下所示:
input {
twitter {
consumer_key => "consumer_key"
consumer_secret => "consumer_secret"
oauth_token => "access_token"
oauth_token_secret => "access_token_secret"
keywords => [ "random", "word"]
full_tweet => true
type => "tweet"
}
}
filter {
clone {
clones => ["author"]
}
if([type] == "tweet") {
mutate {
remove_field => ["authorId", "authorAt"]
}
} else {
mutate {
remove_field => ["tweetId", "tweetContent"]
}
}
}
output {
stdout { codec => dots }
if [type] == "tweet" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[tweetId]}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "author"
document_id => "%{[authorId]}"
}
}
}
您可以在 logstash 上使用克隆过滤器插件。
使用示例 logstash 配置文件,该文件从 stdin 获取 JSON 输入并仅在 stdout 上显示输出:
input {
stdin {
codec => json
type => "tweet"
}
}
filter {
mutate {
add_field => {
"tweetId" => "%{[tweet][tweetId]}"
"content" => "%{[tweet][tweetContent]}"
"date" => "%{[tweet][publishedAt]}"
"shareNumber" => "%{[tweet][analytics][shareNumber]}"
"authorId" => "%{[author][authorId]}"
"authorAt" => "%{[author][authorAt]}"
"description" => "%{[author][description]}"
}
}
date {
match => ["date", "yyyy dd MMMM"]
target => "date"
}
ruby {
code => '
event.set("hashtags", event.get("[tweet][hashtags]"))
'
}
clone {
clones => ["author"]
}
mutate {
remove_field => ["author", "tweet", "message"]
}
if([type] == "tweet") {
mutate {
remove_field => ["authorId", "authorAt", "description"]
}
} else {
mutate {
remove_field => ["tweetId", "content", "hashtags", "date", "shareNumber"]
}
}
}
output {
stdout {
codec => rubydebug
}
}
用作输入:
{"tweet": { "tweetId": 1025, "tweetContent": "Hey this is a fake document", "hashtags": ["Whosebug", "elasticsearch"], "publishedAt": "2017 23 August","analytics": { "likeNumber": 400, "shareNumber": 100 } }, "author":{ "authorId": 819744, "authorAt": "the_expert", "authorName": "John Smith", "description": "fake description" } }
您将获得这两个文档:
{
"date" => 2017-08-23T00:00:00.000Z,
"hashtags" => [
[0] "Whosebug",
[1] "elasticsearch"
],
"type" => "tweet",
"tweetId" => "1025",
"content" => "Hey this is a fake document",
"shareNumber" => "100",
"@timestamp" => 2017-08-23T20:36:53.795Z,
"@version" => "1",
"host" => "my-host"
}
{
"description" => "fake description",
"type" => "author",
"authorId" => "819744",
"@timestamp" => 2017-08-23T20:36:53.795Z,
"authorAt" => "the_expert",
"@version" => "1",
"host" => "my-host"
}
您也可以使用 ruby 脚本来展平字段,然后在必要时对 mutate 使用重命名。
如果您希望 elasticsearch 使用 authorId 和 tweetId,而不是默认 ID,您可以使用 document_id.
配置 elasticsearch 输出output {
stdout { codec => dots }
if [type] == "tweet" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[tweetId]}"
}
} else {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "twitter"
document_type => "tweet"
document_id => "%{[authorId]}"
}
}
}