Return Python 中 Splunk 搜索的错误数

Return Number of Errors From Splunk Search in Python

有什么方法可以获取在使用 splunklib.results 模块或任何 splunklib 模块进行 Splunk 搜索期间发生的错误数?

下面是我目前的代码:

#purpose of script: To connect to Splunk, execute a query, and write the query results out to an excel file.
#query results = multiple dynamic # of rows. 7 columns. 

#!/usr/bin/env python
import splunklib.client as client #splunklib.client class is used to connect to splunk, authenticate, and maintain session
import splunklib.results as results #module for returning results and printing/writing them out

listOfAppIDs = []
#open file to read each line and add each line in file to an array. These are our appID's to search
with open('filelocation.txt', 'r') as fi:
    for line in fi:
        listOfAppIDs.append(line.rstrip('\n'))
print listOfAppIDs

#identify variables used to log in
HOST = "8.8.8.8"
PORT = 8089
USERNAME = "uName"
PASSWORD = "pWord"

startPoint = "appID1" #initial start point in array

outputCsv = open('filelocation.csv', 'wb')
fieldnames = ['Application ID', 'transport', 'dst_port', 'Average Throughput per Month','Total Sessions Allowed', 'Unique Source IPs', 'Unique Destination IPs']
writer = csv.DictWriter(outputCsv, fieldnames=fieldnames)
writer.writeheader();

def connect():
    global startPoint , item
    print "startPoint: " + startPoint

    #Create a service instance by using the connect function and log in
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD,
        autologin=True
    )   
    jobs = service.jobs# Get the collection of jobs/searches
    kwargs_blockingsearch = {"exec_mode": "normal"}

    try:
        for item in listOfAppIDs:
            errorCount=0
            print "item: " + item
            if (item >= startPoint):    
                searchquery_blocking = "search splunkQery"
                print item + ':'
                job = jobs.create(searchquery_blocking, **kwargs_blockingsearch) # A blocking search returns query result. Search executes here
                print "Splunk query for appID " , item , " completed! \n"
                resultCount = job["resultCount"] #number of results this job (splunk query) returned
                print "result count " , resultCount
                rr = results.ResultsReader(job.results())
                for result in rr:
                    if isinstance(result, results.Message):
                        # Diagnostic messages may be returned in the results
                        # Check the type and do something.
                        if result.type == log_type:
                            print '%s: %s' % (result.type, result.message)
                            errorCount+=1
                    elif isinstance(result, dict):
                        # Normal events are returned as dicts
                        # Do something with them if required.
                        print result
                        writer.writerow([result + errorCount])
                        pass
                assert rr.is_preview == False
    except:
        print "\nexcept\n"
        startPoint = item #returh to connect function but start where startPoint is at in array
        connect()

   print "done!"    

connect()

上面的代码出现以下错误:

'OrderedDict' object has no attribute 'messages'

from splunklib import results
my_feed=results.ResultsReader(open("results.xml"))

log_type='ERROR'

n_errors=0
for result in my_feed.results:
    if isinstance(result, results.Message):
       if result.type==log_type:
          print result.message
          n_errors+=1

您可能对 data.load() 有疑问,因为它需要具有单个根节点的 xml。如果您在一个提要中有多个结果节点,则可以绕过此包装您的提要,即:"<root>+open("feed.xml").read()</root>"

如果您可以访问原始提要而不是数据对象,您可以使用 lxml insted of splunk lib

len( lxml.etree.parse("results.xml").findall("//messages/msg[@type='ERROR']") )

以下是基于splunklib文档的完整示例。 ResultsReader 解析原子提要并为您调用每个结果的 data.load()

      import splunklib.client as client
      import splunklib.results as results
      from time import sleep

      log_type='ERROR'

      service = client.connect(...)
      job = service.jobs.create("search * | head 5")
      while not job.is_done():
          sleep(.2)
      rr = results.ResultsReader(job.results())
      for result in rr:
          if isinstance(result, results.Message):
              # Diagnostic messages may be returned in the results
              # Check the type and do something.
              if result.type == log_type:
                 print '%s: %s' % (result.type, result.message)
          elif isinstance(result, dict):
              # Normal events are returned as dicts
              # Do something with them if required.
              pass
      assert rr.is_preview == False