Python MongoDB 查询请求将数据分割成块
Python MongoDB query requests segment data into chunks
我正在编写一个 python 脚本来执行以下这些步骤。
查询一个MongoDB数据库
解析和聚合结果
通过 REST API
将数据上传到 ServiceNow table
脚本有效,但是数据集太大,REST 事务在 60 秒后超时(连接被目的地的 ServiceNow 服务器关闭)。
我需要将数据分成块并为每个数据块发送单独的 REST 事务,以确保通过 POST 发送完整的数据集而不会达到超时限制。
如何通过修改下面的脚本来实现该目标?
#!/usr/bin/env python
from config import *
import os, sys
mypath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(mypath, "api-python-client"))
from apiclient.mongo import *
from pymongo import MongoClient
import json
import requests
from bson.json_util import dumps
client = MongoClient(mongo_uri)
#Create ServiceNow URL
svcnow_url = create_svcnow_url('u_imp_cmps')
#BITSDB Nmap Collection
db = client[mongo_db]
#Aggregate - RDBMS equivalent to Alias select x as y
#Rename fields to match ServiceNow field names
computers = db['computer'].aggregate([
{"$unwind": "$hostnames"},
{"$project" : {
"_id":0,
"u_hostname": "$hostnames.name",
"u_ipv4": "$addresses.ipv4",
"u_status": "$status.state",
"u_updated_timestamp": "$last_seen"
}}
])
j = dumps({"records":computers})
#print(j)
#Set proper headers
headers = {"Content-Type":"application/json","Accept":"application/json"}
#Build HTTP Request
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=j)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
exit()
#Decode the JSON response into a dictionary and use the data
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
更新:我有一个计划,但我不确定如何准确实施。
- 将游标设置为每个 1000 条记录的固定批处理大小
- 当批次已满时,创建 JSON 输出并通过请求发送数据
- 在一个循环中:继续抓取新的批次并将每个批次发送到目的地,直到到达整个数据集
https://docs.mongodb.com/v3.0/reference/method/cursor.batchSize/
基本上我认为我可以通过创建批次并循环遍历批次来解决这个问题,每次调用一个新的 API。如果有人有任何想法,这是否是一个好的计划以及如何实施解决方案,请告诉我。谢谢
j = dumps({"records":computers})
将 return 一个列表,因此您可以通过调用 j[x]
或遍历 for 循环轻松指向单个数据条目。这些条目中的每一个都应该被 ServiceNow 接受。
# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
for data_point in j:
#Build HTTP Request (Note we are using data_point instead of j)
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
else:
# This is a response of success for a single record
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()
如果 MongoDB 中有 100 个新条目,这将对 ServiceNow 进行 100 POST 次调用。您的 ServiceNow 实例应该能够处理负载,并且您可以非常轻松地识别未能加载的记录。
但是,如果您出于任何原因需要压缩调用次数,我建议将您的列表拆分为 'sublists',例如 one-liner featured in this answer:
# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
# Each POST will send up to 10 records of data
split_size = 10
# Note the two places where our split_size variable is used
for data_point in [j[x:x+split_size] for x in xrange(0, len(j), split_size)]:
#Build HTTP Request (Note we are using data_point instead of j)
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
else:
# This is a response of success for a single record
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()
我正在编写一个 python 脚本来执行以下这些步骤。
查询一个MongoDB数据库 解析和聚合结果 通过 REST API
将数据上传到 ServiceNow table脚本有效,但是数据集太大,REST 事务在 60 秒后超时(连接被目的地的 ServiceNow 服务器关闭)。
我需要将数据分成块并为每个数据块发送单独的 REST 事务,以确保通过 POST 发送完整的数据集而不会达到超时限制。
如何通过修改下面的脚本来实现该目标?
#!/usr/bin/env python
from config import *
import os, sys
mypath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(mypath, "api-python-client"))
from apiclient.mongo import *
from pymongo import MongoClient
import json
import requests
from bson.json_util import dumps
client = MongoClient(mongo_uri)
#Create ServiceNow URL
svcnow_url = create_svcnow_url('u_imp_cmps')
#BITSDB Nmap Collection
db = client[mongo_db]
#Aggregate - RDBMS equivalent to Alias select x as y
#Rename fields to match ServiceNow field names
computers = db['computer'].aggregate([
{"$unwind": "$hostnames"},
{"$project" : {
"_id":0,
"u_hostname": "$hostnames.name",
"u_ipv4": "$addresses.ipv4",
"u_status": "$status.state",
"u_updated_timestamp": "$last_seen"
}}
])
j = dumps({"records":computers})
#print(j)
#Set proper headers
headers = {"Content-Type":"application/json","Accept":"application/json"}
#Build HTTP Request
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=j)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
exit()
#Decode the JSON response into a dictionary and use the data
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
更新:我有一个计划,但我不确定如何准确实施。
- 将游标设置为每个 1000 条记录的固定批处理大小
- 当批次已满时,创建 JSON 输出并通过请求发送数据
- 在一个循环中:继续抓取新的批次并将每个批次发送到目的地,直到到达整个数据集
https://docs.mongodb.com/v3.0/reference/method/cursor.batchSize/
基本上我认为我可以通过创建批次并循环遍历批次来解决这个问题,每次调用一个新的 API。如果有人有任何想法,这是否是一个好的计划以及如何实施解决方案,请告诉我。谢谢
j = dumps({"records":computers})
将 return 一个列表,因此您可以通过调用 j[x]
或遍历 for 循环轻松指向单个数据条目。这些条目中的每一个都应该被 ServiceNow 接受。
# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
for data_point in j:
#Build HTTP Request (Note we are using data_point instead of j)
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
else:
# This is a response of success for a single record
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()
如果 MongoDB 中有 100 个新条目,这将对 ServiceNow 进行 100 POST 次调用。您的 ServiceNow 实例应该能够处理负载,并且您可以非常轻松地识别未能加载的记录。
但是,如果您出于任何原因需要压缩调用次数,我建议将您的列表拆分为 'sublists',例如 one-liner featured in this answer:
# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
# Each POST will send up to 10 records of data
split_size = 10
# Note the two places where our split_size variable is used
for data_point in [j[x:x+split_size] for x in xrange(0, len(j), split_size)]:
#Build HTTP Request (Note we are using data_point instead of j)
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
#Check for HTTP codes other than 200
if response.status_code != 200:
print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
else:
# This is a response of success for a single record
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()