ElasticSearch:将旧访客数据放入索引

ElasticSearch: Getting old visitor data into an index

我正在学习 ElasticSearch,希望将我的业务数据转储到 ES 中并使用 Kibana 查看它。经过一周的各种问题后,我终于让 ES 和 Kibana 在 2 Ubuntu 14.04 台式机(集群)上工作(分别为 1.7.0 和 4)。

我现在遇到的问题是如何最好地将数据导入ES。数据流是我为每次访问具有唯一 ID 的文本文件捕获 PHP 全局变量 $_REQUEST 和 $_SERVER。从那里,如果他们填写表格,我会在一个文本文件中捕获该数据,该文本文件在不同的目录中也以该唯一 ID 命名。然后我的客户告诉我该表格填写是否有任何好处,最多延迟 50 天。

所以我从访问者数据开始 - $_REQUEST 和 $_SERVER。很多都是多余的,所以我真的只是试图捕获他们到达的时间戳、他们的 IP、他们访问的服务器的 IP、他们访问的域、唯一 ID 和他们的用户代理。所以我创建了这个映射:

time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # Originally this included 'index': 'not analyzed' as well

visit_mapping = {
    'properties': {
        'uniqID': str_not_analyzed,
        'pages': str_not_analyzed,
        'domain': str_not_analyzed,
        'Srvr IP': str_not_analyzed,
        'Visitor IP': str_not_analyzed,
        'Agent': { 'type': 'string' },
        'Referrer': { 'type': 'string' },
        'Entrance Time': time_date_mapping, # Stored as a Unix timestamp
        'Request Time': time_date_mapping, # Stored as a Unix timestamp
        'Raw': { 'type': 'string', 'index': 'not_analyzed' },
    },
}

然后我将它输入到 ES 中:

es.index(
            index=Visit_to_ElasticSearch.INDEX,
            doc_type=Visit_to_ElasticSearch.DOC_TYPE,
            id=self.uniqID,
            timestamp=int(math.floor(self._visit['Entrance Time'])),
            body=visit
        )

当我在 ES 上查看索引中的数据时,只有 Entrance Time、_id、_type、domain 和 uniqID 被索引以供搜索(根据 Kibana)。所有数据都存在于文档中,但大多数字段显示 "Unindexed fields can not be searched."

此外,我试图只获取代理的饼图。但我无法想出可视化,因为无论我点击什么框,代理字段都不是聚合的选项。刚才提到它是因为索引的字段似乎确实显示了。

我试图模仿引入 github 的 elasticsearch.py 示例中的映射示例。有人可以纠正我如何使用该地图吗?

谢谢

------------映射------------

{
  "visits": {
    "mappings": {
      "visit": {
        "properties": {
          "Agent": {
            "type": "string"
          },
          "Entrance Time": {
            "type": "date",
            "format": "dateOptionalTime"
          },
          "Raw": {
            "properties": {
              "Entrance Time": {
                "type": "double"
              },
              "domain": {
                "type": "string"
              },
              "uniqID": {
                "type": "string"
              }
            }
          },
          "Referrer": {
            "type": "string"
          },
          "Request Time": {
            "type": "string"
          },
          "Srvr IP": {
            "type": "string"
          },
          "Visitor IP": {
            "type": "string"
          },
          "domain": {
            "type": "string"
          },
          "uniqID": {
            "type": "string"
          }
        }
      }
    }
  }
}

------------更新和新映射------------

所以我删除了索引并重新创建了它。在我对将数据映射到特定字段类型一无所知之前,原始索引中就有一些数据。这似乎解决了只有几个字段被索引的问题。

但是,我的部分映射似乎被忽略了。特别是代理字符串映射:

visit_mapping = {
    'properties': {
        'uniqID': str_not_analyzed,
        'pages': str_not_analyzed,
        'domain': str_not_analyzed,
        'Srvr IP': str_not_analyzed,
        'Visitor IP': str_not_analyzed,
        'Agent': { 'type': 'string', 'index': 'not_analyzed' },
        'Referrer': { 'type': 'string' },
        'Entrance Time': time_date_mapping,
        'Request Time': time_date_mapping,
        'Raw': { 'type': 'string', 'index': 'not_analyzed' },
    },
}

这是http://localhost:9200/visits_test2/_mapping

的输出
{
  "visits_test2": {
    "mappings": {
        "visit": {
          "properties":  {
            "Agent":{"type":"string"},
            "Entrance Time": {"type":"date","format":"dateOptionalTime"},
            "Raw": {
              "properties": {
                "Entrance Time":{"type":"double"},
                "domain":{"type":"string"},
                "uniqID":{"type":"string"}
              }
            },
            "Referrer":{"type":"string"},
            "Request Time": {"type":"date","format":"dateOptionalTime"},
            "Srvr IP":{"type":"string"},
            "Visitor IP":{"type":"string"},
            "domain":{"type":"string"},
            "uniqID":{"type":"string"}
          }
        }
      }
    }
  }

请注意,我使用了一个全新的索引。原因是我想确保没有任何东西从一个转移到另一个。

请注意,我正在使用 Python 库 elasticsearch.py 并遵循其映射语法示例。

---------- Python 根据评论请求将数据输入 ES 的代码 ----------

下面是一个文件名mapping.py,我还没有完全注释代码,因为这只是测试这种数据进入ES的方法是否可行的代码。如果它不是不言自明的,请告诉我,我会添加额外的评论。

请注意,我在 PHP 编程多年,然后才开始学习 Python。为了使用 Python 更快地起床和 运行 我创建了几个具有基本字符串和文件操作功能的文件,并将它们打包成一个包。它们是用 Python 编写的,旨在模仿内置 PHP 函数的行为。因此,当您看到对 php_basic_* 的调用时,它就是其中一个函数。

# Standard Library Imports
import json, copy, datetime, time, enum, os, sys, numpy, math
from datetime import datetime
from enum import Enum, unique
from elasticsearch import Elasticsearch

# My Library
import basicconfig, mybasics
from mybasics.cBaseClass import BaseClass, BaseClassErrors
from mybasics.cHelpers import HandleErrors, LogLvl

# This imports several constants, a couple of functions, and a helper class
from basicconfig.startup_config import *

# Connect to ElasticSearch
es = Elasticsearch([{'host': 'localhost', 'port': '9200'}])

# Create mappings of a visit
time_date_mapping = { 'type': 'date_time' }
str_not_analyzed = { 'type': 'string'} # This originally included 'index': 'not_analyzed' as well

visit_mapping = {
    'properties': {
        'uniqID': str_not_analyzed,
        'pages': str_not_analyzed,
        'domain': str_not_analyzed,
        'Srvr IP': str_not_analyzed,
        'Visitor IP': str_not_analyzed,
        'Agent': { 'type': 'string', 'index': 'not_analyzed' },
        'Referrer': { 'type': 'string' },
        'Entrance Time': time_date_mapping,
        'Request Time': time_date_mapping,
        'Raw': { 'type': 'string', 'index': 'not_analyzed' },
        'Pages': { 'type': 'string', 'index': 'not_analyzed' },
    },
}


class Visit_to_ElasticSearch(object):
    """

    """

    INDEX = 'visits'
    DOC_TYPE = 'visit'



    def __init__(self, fname, index=True):
        """

        """

        self._visit = json.loads(php_basic_files.file_get_contents(fname))
        self._pages = self._visit.pop('pages')

        self.uniqID = self._visit['uniqID']
        self.domain = self._visit['domain']
        self.entrance_time = self._convert_time(self._visit['Entrance Time'])

        # Get a list of the page IDs
        self.pages = self._pages.keys()

        # Extra IPs and such from a single page
        page = self._pages[self.pages[0]]
        srvr = page['SERVER']
        req = page['REQUEST']

        self.visitor_ip = srvr['REMOTE_ADDR']
        self.srvr_ip = srvr['SERVER_ADDR']
        self.request_time = self._convert_time(srvr['REQUEST_TIME'])

        self.agent = srvr['HTTP_USER_AGENT']

        # Now go grab data that might not be there...
        self._extract_optional()

        if index is True:
            self.index_with_elasticsearch()


    def _convert_time(self, ts):
        """

        """

        try:
            dt = datetime.fromtimestamp(ts)
        except TypeError:
            dt = datetime.fromtimestamp(float(ts))

        return dt.strftime('%Y-%m-%dT%H:%M:%S')         


    def _extract_optional(self):
        """

        """

        self.referrer = ''


    def index_with_elasticsearch(self):
        """

        """

        visit = {
            'uniqID': self.uniqID,
            'pages': [],
            'domain': self.domain,
            'Srvr IP': self.srvr_ip,
            'Visitor IP': self.visitor_ip,
            'Agent': self.agent,
            'Referrer': self.referrer,
            'Entrance Time': self.entrance_time,
            'Request Time': self.request_time,
            'Raw': self._visit,
            'Pages': php_basic_str.implode(', ', self.pages),
        }

        es.index(
            index=Visit_to_ElasticSearch.INDEX,
            doc_type=Visit_to_ElasticSearch.DOC_TYPE,
            id=self.uniqID,
            timestamp=int(math.floor(self._visit['Entrance Time'])),
            body=visit
        )   


es.indices.create(
    index=Visit_to_ElasticSearch.INDEX,
    body={
        'settings': {
            'number_of_shards': 5,
            'number_of_replicas': 1,
        }
    },
    # ignore already existing index
    ignore=400
)

万一重要,这是我用来将数据转储到 ES 中的简单循环:

for f in all_files:
    try:
        visit = mapping.Visit_to_ElasticSearch(f)
    except IOError:
        pass

其中 all_files 是我的测试数据集中所有访问文件的列表(完整路径)。

这是来自 Google 机器人访问的示例访问文件:

{u'Entrance Time': 1407551587.7385,
     u'domain': u'############',
     u'pages': {u'6818555600ccd9880bf7acef228c5d47': {u'REQUEST': [],
       u'SERVER': {u'DOCUMENT_ROOT': u'/var/www/####/',
        u'Entrance Time': 1407551587.7385,
        u'GATEWAY_INTERFACE': u'CGI/1.1',
        u'HTTP_ACCEPT': u'*/*',
        u'HTTP_ACCEPT_ENCODING': u'gzip,deflate',
        u'HTTP_CONNECTION': u'Keep-alive',
        u'HTTP_FROM': u'googlebot(at)googlebot.com',
        u'HTTP_HOST': u'############',
        u'HTTP_IF_MODIFIED_SINCE': u'Fri, 13 Jun 2014 20:26:33 GMT',
        u'HTTP_USER_AGENT': u'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
        u'PATH': u'/usr/local/bin:/usr/bin:/bin',
        u'PHP_SELF': u'/index.php',
        u'QUERY_STRING': u'',
        u'REDIRECT_SCRIPT_URI': u'http://############/',
        u'REDIRECT_SCRIPT_URL': u'############',
        u'REDIRECT_STATUS': u'200',
        u'REDIRECT_URL': u'############',
        u'REMOTE_ADDR': u'############',
        u'REMOTE_PORT': u'46271',
        u'REQUEST_METHOD': u'GET',
        u'REQUEST_TIME': u'1407551587',
        u'REQUEST_URI': u'############',
        u'SCRIPT_FILENAME': u'/var/www/PIAN/index.php',
        u'SCRIPT_NAME': u'/index.php',
        u'SCRIPT_URI': u'http://############/',
        u'SCRIPT_URL': u'/############/',
        u'SERVER_ADDR': u'############',
        u'SERVER_ADMIN': u'admin@############',
        u'SERVER_NAME': u'############',
        u'SERVER_PORT': u'80',
        u'SERVER_PROTOCOL': u'HTTP/1.1',
        u'SERVER_SIGNATURE': u'<address>Apache/2.2.22 (Ubuntu) Server at ############ Port 80</address>\n',
        u'SERVER_SOFTWARE': u'Apache/2.2.22 (Ubuntu)',
        u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'},
       u'SESSION': {u'Entrance Time': 1407551587.7385,
        u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}}},
     u'uniqID': u'bbc398716f4703cfabd761cc8d4101a1'}

现在我更好地理解了为什么 Raw 字段是一个对象而不是简单的 string,因为它被赋值 self._visit 而后者又被 json.loads(php_basic_files.file_get_contents(fname)) 初始化。

无论如何,根据您在上面提供的所有信息,我认为映射从未通过 put_mapping 安装。从那时起,没有任何其他方法可以按照您喜欢的方式工作。我建议您修改代码以安装映射 ,然后 为您的第一个 visit 文档编制索引。