Splunk Python SDK API job.results 限制为 50k 个结果。试图设置一个偏移量来拉取多个 50k 块,但不知道如何让它工作
Splunk Python SDK API job.results limited to 50k results. Trying to set an offset to pull multiple chunks of 50k but don't know how to get it to work
我有一份工作 ['resultCount'] 是 367k,但无论我做什么,我似乎都无法提取超过前 50,000 块。
我从这里的答案中读取了这段代码,供有类似最终目标和设置的人使用:https://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html
rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))
我围绕它编写了下面的代码并且我已经摆弄了一下它,但是我无法让 offset=self._offset 做任何事情而且我不知道它应该做什么.
class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
self.username = username
self.password = password
self.customerGuid = customerGuid
flag = True
while flag:
try:
self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
flag = False
except binding.HTTPError as e:
json_log.debug(str(e))
def search(self, query_dict):
query = query_dict['search']
label = query_dict['label']
search_headers = query_dict['headers']
customer = query_dict['customer']
customerGuid = query_dict['customerGuid']
try:
earliest_time = query_dict['earliest_time']
latest_time = query_dict['latest_time']
except KeyError:
earliest_time = '-1d@d'
latest_time = '@d'
json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
while True:
try:
while not job.is_ready():
pass
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug("\n\nDone!\n\n")
break
sleep(2)
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
except binding.HTTPError as e:
json_log.debug(str(e))
pass
except AttributeError:
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
# Get the results and display them
result_count = job['resultCount']
rs = job.results(count=0)
rr = results.ResultsReader(io.BufferedReader(rs))
results_list = []
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
json_log.debug('%s: %s label=%s customerGuid=%s'
% (result.type, result.message, label, customerGuid))
elif isinstance(result, dict):
# Normal events are returned as dicts
keys, values = [], []
for header in search_headers:
if header not in result.keys():
print(header)
result[header] = ''
for key, value in result.items():
if key in search_headers:
keys.append(str(key))
values.append(str(value))
if not results_list == []:
results_list.append(values)
else:
results_list.append(keys)
results_list.append(values)
output = io.BytesIO()
writer = csv.writer(output, delimiter=',')
writer.writerows(results_list)
output_string = output.getvalue()
assert rr.is_preview is False
job.cancel()
return [label, output_string.replace('\r\n', '\n').replace('---', '')]
def searches(self, query_list):
print(query_list)
if type(query_list) == dict:
query_list = [value for value in query_list.values()]
with closing(ThreadPool(processes=len(query_list))) as pool:
results = pool.map(self.search, query_list)
pool.terminate()
print(results)
search_results = {item[0]: item[1] for item in results}
print(search_results)
return search_results
我能够让这个工作成功。我下面的代码应该演示这是如何完成的。
import io
import csv
from time import sleep
import splunklib.results as results
import splunklib.client as client
import splunklib.binding as binding
from multiprocessing.pool import ThreadPool
from contextlib import closing
class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
self.username = username
self.password = password
self.customerGuid = customerGuid
flag = True
while flag:
try:
self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
flag = False
except binding.HTTPError as e:
json_log.debug(str(e))
def search(self, query_dict):
query = query_dict['search']
label = query_dict['label']
search_headers = query_dict['headers']
customer = query_dict['customer']
customerGuid = query_dict['customerGuid']
try:
earliest_time = query_dict['earliest_time']
latest_time = query_dict['latest_time']
except KeyError:
earliest_time = '-1d@d'
latest_time = '@d'
kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
flag = True
while flag:
try:
job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
flag = False
except binding.HTTPError as e:
pass
pass
while True:
try:
while not job.is_ready():
pass
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])}
if stats["isDone"] == "1":
break
sleep(2)
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
if stats["isDone"] == "1":
break
sleep(2)
except binding.HTTPError as e:
pass
except AttributeError:
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
if stats["isDone"] == "1":
break
sleep(2)
result_count = job['resultCount']
offset = 0
count = 50000
results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)
while len(results_list) < int(result_count) + 1:
offset += count
placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
results_list.extend(placeholder)
output = io.BytesIO()
writer = csv.writer(output, delimiter=',')
writer.writerows(results_list)
output_string = output.getvalue()
job.cancel()
return [label, output_string.replace('\r\n', '\n').replace('---', '')]
def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
# Get the results and display them
kwargs_paginate = {"count": count,
"offset": offset}
blocksearch_results = job.results(**kwargs_paginate)
results_list = []
reader = results.ResultsReader(blocksearch_results)
for result in reader:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
elif isinstance(result, dict):
# Normal events are returned as dicts
keys, values = [], []
for header in search_headers:
if header not in result.keys():
result[header] = ''
for key, value in result.items():
if key in search_headers:
keys.append(str(key))
values.append(str(value))
if not results_list == []:
results_list.append(values)
elif first:
results_list.append(keys)
results_list.append(values)
else:
results_list.append(values)
assert not reader.is_preview
return results_list
def searches(self, query_list):
if type(query_list) == dict:
query_list = [value for value in query_list.values()]
with closing(ThreadPool(processes=len(query_list))) as pool:
results = pool.map(self.search, query_list)
pool.terminate()
search_results = {item[0]: item[1] for item in results}
return search_results
我有一份工作 ['resultCount'] 是 367k,但无论我做什么,我似乎都无法提取超过前 50,000 块。
我从这里的答案中读取了这段代码,供有类似最终目标和设置的人使用:https://answers.splunk.com/answers/114045/python-sdk-results-resultsreader-extremely-slow.html
rs = job.results(count=maxRecords, offset=self._offset)
results.ResultsReader(io.BufferedReader(ResponseReaderWrapper(rs)))
我围绕它编写了下面的代码并且我已经摆弄了一下它,但是我无法让 offset=self._offset 做任何事情而且我不知道它应该做什么.
class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
self.username = username
self.password = password
self.customerGuid = customerGuid
flag = True
while flag:
try:
self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
flag = False
except binding.HTTPError as e:
json_log.debug(str(e))
def search(self, query_dict):
query = query_dict['search']
label = query_dict['label']
search_headers = query_dict['headers']
customer = query_dict['customer']
customerGuid = query_dict['customerGuid']
try:
earliest_time = query_dict['earliest_time']
latest_time = query_dict['latest_time']
except KeyError:
earliest_time = '-1d@d'
latest_time = '@d'
json_log.debug('Starting %s customerGuid=%s' % (label, self.customerGuid))
kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
while True:
try:
while not job.is_ready():
pass
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug("\n\nDone!\n\n")
break
sleep(2)
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
except binding.HTTPError as e:
json_log.debug(str(e))
pass
except AttributeError:
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
json_log.debug(stats)
if stats["isDone"] == "1":
json_log.debug('Search %s finished for customerGuid=%s'
% (label, customerGuid))
break
sleep(2)
# Get the results and display them
result_count = job['resultCount']
rs = job.results(count=0)
rr = results.ResultsReader(io.BufferedReader(rs))
results_list = []
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
json_log.debug('%s: %s label=%s customerGuid=%s'
% (result.type, result.message, label, customerGuid))
elif isinstance(result, dict):
# Normal events are returned as dicts
keys, values = [], []
for header in search_headers:
if header not in result.keys():
print(header)
result[header] = ''
for key, value in result.items():
if key in search_headers:
keys.append(str(key))
values.append(str(value))
if not results_list == []:
results_list.append(values)
else:
results_list.append(keys)
results_list.append(values)
output = io.BytesIO()
writer = csv.writer(output, delimiter=',')
writer.writerows(results_list)
output_string = output.getvalue()
assert rr.is_preview is False
job.cancel()
return [label, output_string.replace('\r\n', '\n').replace('---', '')]
def searches(self, query_list):
print(query_list)
if type(query_list) == dict:
query_list = [value for value in query_list.values()]
with closing(ThreadPool(processes=len(query_list))) as pool:
results = pool.map(self.search, query_list)
pool.terminate()
print(results)
search_results = {item[0]: item[1] for item in results}
print(search_results)
return search_results
我能够让这个工作成功。我下面的代码应该演示这是如何完成的。
import io
import csv
from time import sleep
import splunklib.results as results
import splunklib.client as client
import splunklib.binding as binding
from multiprocessing.pool import ThreadPool
from contextlib import closing
class SplunkConnector(object):
def __init__(self, username, password, customerGuid):
self.username = username
self.password = password
self.customerGuid = customerGuid
flag = True
while flag:
try:
self.service = client.connect(host=*****, port=8089, username=self.username, password=self.password, scheme='https')
flag = False
except binding.HTTPError as e:
json_log.debug(str(e))
def search(self, query_dict):
query = query_dict['search']
label = query_dict['label']
search_headers = query_dict['headers']
customer = query_dict['customer']
customerGuid = query_dict['customerGuid']
try:
earliest_time = query_dict['earliest_time']
latest_time = query_dict['latest_time']
except KeyError:
earliest_time = '-1d@d'
latest_time = '@d'
kwargs_normalsearch = {'exec_mode': 'normal', 'earliest_time': earliest_time, 'latest_time': latest_time, 'output_mode': 'csv'}
flag = True
while flag:
try:
job = self.service.jobs.create(query + ' | fillnull value="---"', **kwargs_normalsearch)
flag = False
except binding.HTTPError as e:
pass
pass
while True:
try:
while not job.is_ready():
pass
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100,
"scanCount": int(job["scanCount"]),
"eventCount": int(job["eventCount"]),
"resultCount": int(job["resultCount"])}
if stats["isDone"] == "1":
break
sleep(2)
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
if stats["isDone"] == "1":
break
sleep(2)
except binding.HTTPError as e:
pass
except AttributeError:
stats = {"isDone": job["isDone"],
"label": label,
"customer": customer,
"customerGuid": customerGuid,
"doneProgress": float(job["doneProgress"]) * 100}
if stats["isDone"] == "1":
break
sleep(2)
result_count = job['resultCount']
offset = 0
count = 50000
results_list = self.results_getter(job, label, customerGuid, search_headers, True, count, offset, result_count)
while len(results_list) < int(result_count) + 1:
offset += count
placeholder = self.results_getter(job, label, customerGuid, search_headers, False, count, offset, result_count)
results_list.extend(placeholder)
output = io.BytesIO()
writer = csv.writer(output, delimiter=',')
writer.writerows(results_list)
output_string = output.getvalue()
job.cancel()
return [label, output_string.replace('\r\n', '\n').replace('---', '')]
def results_getter(self, job, label, customerGuid, search_headers, first, count, offset, result_count):
# Get the results and display them
kwargs_paginate = {"count": count,
"offset": offset}
blocksearch_results = job.results(**kwargs_paginate)
results_list = []
reader = results.ResultsReader(blocksearch_results)
for result in reader:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
elif isinstance(result, dict):
# Normal events are returned as dicts
keys, values = [], []
for header in search_headers:
if header not in result.keys():
result[header] = ''
for key, value in result.items():
if key in search_headers:
keys.append(str(key))
values.append(str(value))
if not results_list == []:
results_list.append(values)
elif first:
results_list.append(keys)
results_list.append(values)
else:
results_list.append(values)
assert not reader.is_preview
return results_list
def searches(self, query_list):
if type(query_list) == dict:
query_list = [value for value in query_list.values()]
with closing(ThreadPool(processes=len(query_list))) as pool:
results = pool.map(self.search, query_list)
pool.terminate()
search_results = {item[0]: item[1] for item in results}
return search_results