ElasticSearch:将旧访客数据放入索引
ElasticSearch: Getting old visitor data into an index
我正在学习 ElasticSearch,希望将我的业务数据转储到 ES 中并使用 Kibana 查看它。经过一周的各种问题后,我终于让 ES 和 Kibana 在 2 Ubuntu 14.04 台式机(集群)上工作(分别为 1.7.0 和 4)。
我现在遇到的问题是如何最好地将数据导入ES。数据流是我为每次访问具有唯一 ID 的文本文件捕获 PHP 全局变量 $_REQUEST 和 $_SERVER。从那里,如果他们填写表格,我会在一个文本文件中捕获该数据,该文本文件在不同的目录中也以该唯一 ID 命名。然后我的客户告诉我该表格填写是否有任何好处,最多延迟 50 天。
所以我从访问者数据开始 - $_REQUEST 和 $_SERVER。很多都是多余的,所以我真的只是试图捕获他们到达的时间戳、他们的 IP、他们访问的服务器的 IP、他们访问的域、唯一 ID 和他们的用户代理。所以我创建了这个映射:
time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # Originally this included 'index': 'not analyzed' as well
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping, # Stored as a Unix timestamp
'Request Time': time_date_mapping, # Stored as a Unix timestamp
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
},
}
然后我将它输入到 ES 中:
es.index(
index=Visit_to_ElasticSearch.INDEX,
doc_type=Visit_to_ElasticSearch.DOC_TYPE,
id=self.uniqID,
timestamp=int(math.floor(self._visit['Entrance Time'])),
body=visit
)
当我在 ES 上查看索引中的数据时,只有 Entrance Time、_id、_type、domain 和 uniqID 被索引以供搜索(根据 Kibana)。所有数据都存在于文档中,但大多数字段显示 "Unindexed fields can not be searched."
此外,我试图只获取代理的饼图。但我无法想出可视化,因为无论我点击什么框,代理字段都不是聚合的选项。刚才提到它是因为索引的字段似乎确实显示了。
我试图模仿引入 github 的 elasticsearch.py 示例中的映射示例。有人可以纠正我如何使用该地图吗?
谢谢
------------映射------------
{
"visits": {
"mappings": {
"visit": {
"properties": {
"Agent": {
"type": "string"
},
"Entrance Time": {
"type": "date",
"format": "dateOptionalTime"
},
"Raw": {
"properties": {
"Entrance Time": {
"type": "double"
},
"domain": {
"type": "string"
},
"uniqID": {
"type": "string"
}
}
},
"Referrer": {
"type": "string"
},
"Request Time": {
"type": "string"
},
"Srvr IP": {
"type": "string"
},
"Visitor IP": {
"type": "string"
},
"domain": {
"type": "string"
},
"uniqID": {
"type": "string"
}
}
}
}
}
}
------------更新和新映射------------
所以我删除了索引并重新创建了它。在我对将数据映射到特定字段类型一无所知之前,原始索引中就有一些数据。这似乎解决了只有几个字段被索引的问题。
但是,我的部分映射似乎被忽略了。特别是代理字符串映射:
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string', 'index': 'not_analyzed' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping,
'Request Time': time_date_mapping,
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
},
}
这是http://localhost:9200/visits_test2/_mapping
的输出
{
"visits_test2": {
"mappings": {
"visit": {
"properties": {
"Agent":{"type":"string"},
"Entrance Time": {"type":"date","format":"dateOptionalTime"},
"Raw": {
"properties": {
"Entrance Time":{"type":"double"},
"domain":{"type":"string"},
"uniqID":{"type":"string"}
}
},
"Referrer":{"type":"string"},
"Request Time": {"type":"date","format":"dateOptionalTime"},
"Srvr IP":{"type":"string"},
"Visitor IP":{"type":"string"},
"domain":{"type":"string"},
"uniqID":{"type":"string"}
}
}
}
}
}
请注意,我使用了一个全新的索引。原因是我想确保没有任何东西从一个转移到另一个。
请注意,我正在使用 Python 库 elasticsearch.py 并遵循其映射语法示例。
---------- Python 根据评论请求将数据输入 ES 的代码 ----------
下面是一个文件名mapping.py,我还没有完全注释代码,因为这只是测试这种数据进入ES的方法是否可行的代码。如果它不是不言自明的,请告诉我,我会添加额外的评论。
请注意,我在 PHP 编程多年,然后才开始学习 Python。为了使用 Python 更快地起床和 运行 我创建了几个具有基本字符串和文件操作功能的文件,并将它们打包成一个包。它们是用 Python 编写的,旨在模仿内置 PHP 函数的行为。因此,当您看到对 php_basic_* 的调用时,它就是其中一个函数。
# Standard Library Imports
import json, copy, datetime, time, enum, os, sys, numpy, math
from datetime import datetime
from enum import Enum, unique
from elasticsearch import Elasticsearch
# My Library
import basicconfig, mybasics
from mybasics.cBaseClass import BaseClass, BaseClassErrors
from mybasics.cHelpers import HandleErrors, LogLvl
# This imports several constants, a couple of functions, and a helper class
from basicconfig.startup_config import *
# Connect to ElasticSearch
es = Elasticsearch([{'host': 'localhost', 'port': '9200'}])
# Create mappings of a visit
time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # This originally included 'index': 'not_analyzed' as well
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string', 'index': 'not_analyzed' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping,
'Request Time': time_date_mapping,
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
'Pages': { 'type': 'string', 'index': 'not_analyzed' },
},
}
class Visit_to_ElasticSearch(object):
"""
"""
INDEX = 'visits'
DOC_TYPE = 'visit'
def __init__(self, fname, index=True):
"""
"""
self._visit = json.loads(php_basic_files.file_get_contents(fname))
self._pages = self._visit.pop('pages')
self.uniqID = self._visit['uniqID']
self.domain = self._visit['domain']
self.entrance_time = self._convert_time(self._visit['Entrance Time'])
# Get a list of the page IDs
self.pages = self._pages.keys()
# Extra IPs and such from a single page
page = self._pages[self.pages[0]]
srvr = page['SERVER']
req = page['REQUEST']
self.visitor_ip = srvr['REMOTE_ADDR']
self.srvr_ip = srvr['SERVER_ADDR']
self.request_time = self._convert_time(srvr['REQUEST_TIME'])
self.agent = srvr['HTTP_USER_AGENT']
# Now go grab data that might not be there...
self._extract_optional()
if index is True:
self.index_with_elasticsearch()
def _convert_time(self, ts):
"""
"""
try:
dt = datetime.fromtimestamp(ts)
except TypeError:
dt = datetime.fromtimestamp(float(ts))
return dt.strftime('%Y-%m-%dT%H:%M:%S')
def _extract_optional(self):
"""
"""
self.referrer = ''
def index_with_elasticsearch(self):
"""
"""
visit = {
'uniqID': self.uniqID,
'pages': [],
'domain': self.domain,
'Srvr IP': self.srvr_ip,
'Visitor IP': self.visitor_ip,
'Agent': self.agent,
'Referrer': self.referrer,
'Entrance Time': self.entrance_time,
'Request Time': self.request_time,
'Raw': self._visit,
'Pages': php_basic_str.implode(', ', self.pages),
}
es.index(
index=Visit_to_ElasticSearch.INDEX,
doc_type=Visit_to_ElasticSearch.DOC_TYPE,
id=self.uniqID,
timestamp=int(math.floor(self._visit['Entrance Time'])),
body=visit
)
es.indices.create(
index=Visit_to_ElasticSearch.INDEX,
body={
'settings': {
'number_of_shards': 5,
'number_of_replicas': 1,
}
},
# ignore already existing index
ignore=400
)
万一重要,这是我用来将数据转储到 ES 中的简单循环:
for f in all_files:
try:
visit = mapping.Visit_to_ElasticSearch(f)
except IOError:
pass
其中 all_files
是我的测试数据集中所有访问文件的列表(完整路径)。
这是来自 Google 机器人访问的示例访问文件:
{u'Entrance Time': 1407551587.7385,
u'domain': u'############',
u'pages': {u'6818555600ccd9880bf7acef228c5d47': {u'REQUEST': [],
u'SERVER': {u'DOCUMENT_ROOT': u'/var/www/####/',
u'Entrance Time': 1407551587.7385,
u'GATEWAY_INTERFACE': u'CGI/1.1',
u'HTTP_ACCEPT': u'*/*',
u'HTTP_ACCEPT_ENCODING': u'gzip,deflate',
u'HTTP_CONNECTION': u'Keep-alive',
u'HTTP_FROM': u'googlebot(at)googlebot.com',
u'HTTP_HOST': u'############',
u'HTTP_IF_MODIFIED_SINCE': u'Fri, 13 Jun 2014 20:26:33 GMT',
u'HTTP_USER_AGENT': u'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
u'PATH': u'/usr/local/bin:/usr/bin:/bin',
u'PHP_SELF': u'/index.php',
u'QUERY_STRING': u'',
u'REDIRECT_SCRIPT_URI': u'http://############/',
u'REDIRECT_SCRIPT_URL': u'############',
u'REDIRECT_STATUS': u'200',
u'REDIRECT_URL': u'############',
u'REMOTE_ADDR': u'############',
u'REMOTE_PORT': u'46271',
u'REQUEST_METHOD': u'GET',
u'REQUEST_TIME': u'1407551587',
u'REQUEST_URI': u'############',
u'SCRIPT_FILENAME': u'/var/www/PIAN/index.php',
u'SCRIPT_NAME': u'/index.php',
u'SCRIPT_URI': u'http://############/',
u'SCRIPT_URL': u'/############/',
u'SERVER_ADDR': u'############',
u'SERVER_ADMIN': u'admin@############',
u'SERVER_NAME': u'############',
u'SERVER_PORT': u'80',
u'SERVER_PROTOCOL': u'HTTP/1.1',
u'SERVER_SIGNATURE': u'<address>Apache/2.2.22 (Ubuntu) Server at ############ Port 80</address>\n',
u'SERVER_SOFTWARE': u'Apache/2.2.22 (Ubuntu)',
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'},
u'SESSION': {u'Entrance Time': 1407551587.7385,
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}}},
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}
现在我更好地理解了为什么 Raw
字段是一个对象而不是简单的 string
,因为它被赋值 self._visit
而后者又被 json.loads(php_basic_files.file_get_contents(fname))
初始化。
无论如何,根据您在上面提供的所有信息,我认为映射从未通过 put_mapping
安装。从那时起,没有任何其他方法可以按照您喜欢的方式工作。我建议您修改代码以安装映射 ,然后 为您的第一个 visit
文档编制索引。
我正在学习 ElasticSearch,希望将我的业务数据转储到 ES 中并使用 Kibana 查看它。经过一周的各种问题后,我终于让 ES 和 Kibana 在 2 Ubuntu 14.04 台式机(集群)上工作(分别为 1.7.0 和 4)。
我现在遇到的问题是如何最好地将数据导入ES。数据流是我为每次访问具有唯一 ID 的文本文件捕获 PHP 全局变量 $_REQUEST 和 $_SERVER。从那里,如果他们填写表格,我会在一个文本文件中捕获该数据,该文本文件在不同的目录中也以该唯一 ID 命名。然后我的客户告诉我该表格填写是否有任何好处,最多延迟 50 天。
所以我从访问者数据开始 - $_REQUEST 和 $_SERVER。很多都是多余的,所以我真的只是试图捕获他们到达的时间戳、他们的 IP、他们访问的服务器的 IP、他们访问的域、唯一 ID 和他们的用户代理。所以我创建了这个映射:
time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # Originally this included 'index': 'not analyzed' as well
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping, # Stored as a Unix timestamp
'Request Time': time_date_mapping, # Stored as a Unix timestamp
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
},
}
然后我将它输入到 ES 中:
es.index(
index=Visit_to_ElasticSearch.INDEX,
doc_type=Visit_to_ElasticSearch.DOC_TYPE,
id=self.uniqID,
timestamp=int(math.floor(self._visit['Entrance Time'])),
body=visit
)
当我在 ES 上查看索引中的数据时,只有 Entrance Time、_id、_type、domain 和 uniqID 被索引以供搜索(根据 Kibana)。所有数据都存在于文档中,但大多数字段显示 "Unindexed fields can not be searched."
此外,我试图只获取代理的饼图。但我无法想出可视化,因为无论我点击什么框,代理字段都不是聚合的选项。刚才提到它是因为索引的字段似乎确实显示了。
我试图模仿引入 github 的 elasticsearch.py 示例中的映射示例。有人可以纠正我如何使用该地图吗?
谢谢
------------映射------------
{
"visits": {
"mappings": {
"visit": {
"properties": {
"Agent": {
"type": "string"
},
"Entrance Time": {
"type": "date",
"format": "dateOptionalTime"
},
"Raw": {
"properties": {
"Entrance Time": {
"type": "double"
},
"domain": {
"type": "string"
},
"uniqID": {
"type": "string"
}
}
},
"Referrer": {
"type": "string"
},
"Request Time": {
"type": "string"
},
"Srvr IP": {
"type": "string"
},
"Visitor IP": {
"type": "string"
},
"domain": {
"type": "string"
},
"uniqID": {
"type": "string"
}
}
}
}
}
}
------------更新和新映射------------
所以我删除了索引并重新创建了它。在我对将数据映射到特定字段类型一无所知之前,原始索引中就有一些数据。这似乎解决了只有几个字段被索引的问题。
但是,我的部分映射似乎被忽略了。特别是代理字符串映射:
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string', 'index': 'not_analyzed' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping,
'Request Time': time_date_mapping,
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
},
}
这是http://localhost:9200/visits_test2/_mapping
{
"visits_test2": {
"mappings": {
"visit": {
"properties": {
"Agent":{"type":"string"},
"Entrance Time": {"type":"date","format":"dateOptionalTime"},
"Raw": {
"properties": {
"Entrance Time":{"type":"double"},
"domain":{"type":"string"},
"uniqID":{"type":"string"}
}
},
"Referrer":{"type":"string"},
"Request Time": {"type":"date","format":"dateOptionalTime"},
"Srvr IP":{"type":"string"},
"Visitor IP":{"type":"string"},
"domain":{"type":"string"},
"uniqID":{"type":"string"}
}
}
}
}
}
请注意,我使用了一个全新的索引。原因是我想确保没有任何东西从一个转移到另一个。
请注意,我正在使用 Python 库 elasticsearch.py 并遵循其映射语法示例。
---------- Python 根据评论请求将数据输入 ES 的代码 ----------
下面是一个文件名mapping.py,我还没有完全注释代码,因为这只是测试这种数据进入ES的方法是否可行的代码。如果它不是不言自明的,请告诉我,我会添加额外的评论。
请注意,我在 PHP 编程多年,然后才开始学习 Python。为了使用 Python 更快地起床和 运行 我创建了几个具有基本字符串和文件操作功能的文件,并将它们打包成一个包。它们是用 Python 编写的,旨在模仿内置 PHP 函数的行为。因此,当您看到对 php_basic_* 的调用时,它就是其中一个函数。
# Standard Library Imports
import json, copy, datetime, time, enum, os, sys, numpy, math
from datetime import datetime
from enum import Enum, unique
from elasticsearch import Elasticsearch
# My Library
import basicconfig, mybasics
from mybasics.cBaseClass import BaseClass, BaseClassErrors
from mybasics.cHelpers import HandleErrors, LogLvl
# This imports several constants, a couple of functions, and a helper class
from basicconfig.startup_config import *
# Connect to ElasticSearch
es = Elasticsearch([{'host': 'localhost', 'port': '9200'}])
# Create mappings of a visit
time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # This originally included 'index': 'not_analyzed' as well
visit_mapping = {
'properties': {
'uniqID': str_not_analyzed,
'pages': str_not_analyzed,
'domain': str_not_analyzed,
'Srvr IP': str_not_analyzed,
'Visitor IP': str_not_analyzed,
'Agent': { 'type': 'string', 'index': 'not_analyzed' },
'Referrer': { 'type': 'string' },
'Entrance Time': time_date_mapping,
'Request Time': time_date_mapping,
'Raw': { 'type': 'string', 'index': 'not_analyzed' },
'Pages': { 'type': 'string', 'index': 'not_analyzed' },
},
}
class Visit_to_ElasticSearch(object):
"""
"""
INDEX = 'visits'
DOC_TYPE = 'visit'
def __init__(self, fname, index=True):
"""
"""
self._visit = json.loads(php_basic_files.file_get_contents(fname))
self._pages = self._visit.pop('pages')
self.uniqID = self._visit['uniqID']
self.domain = self._visit['domain']
self.entrance_time = self._convert_time(self._visit['Entrance Time'])
# Get a list of the page IDs
self.pages = self._pages.keys()
# Extra IPs and such from a single page
page = self._pages[self.pages[0]]
srvr = page['SERVER']
req = page['REQUEST']
self.visitor_ip = srvr['REMOTE_ADDR']
self.srvr_ip = srvr['SERVER_ADDR']
self.request_time = self._convert_time(srvr['REQUEST_TIME'])
self.agent = srvr['HTTP_USER_AGENT']
# Now go grab data that might not be there...
self._extract_optional()
if index is True:
self.index_with_elasticsearch()
def _convert_time(self, ts):
"""
"""
try:
dt = datetime.fromtimestamp(ts)
except TypeError:
dt = datetime.fromtimestamp(float(ts))
return dt.strftime('%Y-%m-%dT%H:%M:%S')
def _extract_optional(self):
"""
"""
self.referrer = ''
def index_with_elasticsearch(self):
"""
"""
visit = {
'uniqID': self.uniqID,
'pages': [],
'domain': self.domain,
'Srvr IP': self.srvr_ip,
'Visitor IP': self.visitor_ip,
'Agent': self.agent,
'Referrer': self.referrer,
'Entrance Time': self.entrance_time,
'Request Time': self.request_time,
'Raw': self._visit,
'Pages': php_basic_str.implode(', ', self.pages),
}
es.index(
index=Visit_to_ElasticSearch.INDEX,
doc_type=Visit_to_ElasticSearch.DOC_TYPE,
id=self.uniqID,
timestamp=int(math.floor(self._visit['Entrance Time'])),
body=visit
)
es.indices.create(
index=Visit_to_ElasticSearch.INDEX,
body={
'settings': {
'number_of_shards': 5,
'number_of_replicas': 1,
}
},
# ignore already existing index
ignore=400
)
万一重要,这是我用来将数据转储到 ES 中的简单循环:
for f in all_files:
try:
visit = mapping.Visit_to_ElasticSearch(f)
except IOError:
pass
其中 all_files
是我的测试数据集中所有访问文件的列表(完整路径)。
这是来自 Google 机器人访问的示例访问文件:
{u'Entrance Time': 1407551587.7385,
u'domain': u'############',
u'pages': {u'6818555600ccd9880bf7acef228c5d47': {u'REQUEST': [],
u'SERVER': {u'DOCUMENT_ROOT': u'/var/www/####/',
u'Entrance Time': 1407551587.7385,
u'GATEWAY_INTERFACE': u'CGI/1.1',
u'HTTP_ACCEPT': u'*/*',
u'HTTP_ACCEPT_ENCODING': u'gzip,deflate',
u'HTTP_CONNECTION': u'Keep-alive',
u'HTTP_FROM': u'googlebot(at)googlebot.com',
u'HTTP_HOST': u'############',
u'HTTP_IF_MODIFIED_SINCE': u'Fri, 13 Jun 2014 20:26:33 GMT',
u'HTTP_USER_AGENT': u'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
u'PATH': u'/usr/local/bin:/usr/bin:/bin',
u'PHP_SELF': u'/index.php',
u'QUERY_STRING': u'',
u'REDIRECT_SCRIPT_URI': u'http://############/',
u'REDIRECT_SCRIPT_URL': u'############',
u'REDIRECT_STATUS': u'200',
u'REDIRECT_URL': u'############',
u'REMOTE_ADDR': u'############',
u'REMOTE_PORT': u'46271',
u'REQUEST_METHOD': u'GET',
u'REQUEST_TIME': u'1407551587',
u'REQUEST_URI': u'############',
u'SCRIPT_FILENAME': u'/var/www/PIAN/index.php',
u'SCRIPT_NAME': u'/index.php',
u'SCRIPT_URI': u'http://############/',
u'SCRIPT_URL': u'/############/',
u'SERVER_ADDR': u'############',
u'SERVER_ADMIN': u'admin@############',
u'SERVER_NAME': u'############',
u'SERVER_PORT': u'80',
u'SERVER_PROTOCOL': u'HTTP/1.1',
u'SERVER_SIGNATURE': u'<address>Apache/2.2.22 (Ubuntu) Server at ############ Port 80</address>\n',
u'SERVER_SOFTWARE': u'Apache/2.2.22 (Ubuntu)',
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'},
u'SESSION': {u'Entrance Time': 1407551587.7385,
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}}},
u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}
现在我更好地理解了为什么 Raw
字段是一个对象而不是简单的 string
,因为它被赋值 self._visit
而后者又被 json.loads(php_basic_files.file_get_contents(fname))
初始化。
无论如何,根据您在上面提供的所有信息,我认为映射从未通过 put_mapping
安装。从那时起,没有任何其他方法可以按照您喜欢的方式工作。我建议您修改代码以安装映射 ,然后 为您的第一个 visit
文档编制索引。