Python MongoDB 查询请求将数据分割成块

Python MongoDB query requests segment data into chunks

我正在编写一个 python 脚本来执行以下这些步骤。

查询一个MongoDB数据库 解析和聚合结果 通过 REST API

将数据上传到 ServiceNow table

脚本有效,但是数据集太大,REST 事务在 60 秒后超时(连接被目的地的 ServiceNow 服务器关闭)。

我需要将数据分成块并为每个数据块发送单独的 REST 事务,以确保通过 POST 发送完整的数据集而不会达到超时限制。

如何通过修改下面的脚本来实现该目标?

#!/usr/bin/env python

from config import *

import os, sys

mypath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(mypath, "api-python-client"))

from apiclient.mongo import *

from pymongo import MongoClient

import json

import requests

from bson.json_util import dumps

client = MongoClient(mongo_uri)

#Create ServiceNow URL
svcnow_url = create_svcnow_url('u_imp_cmps')

#BITSDB Nmap Collection
db = client[mongo_db]

#Aggregate - RDBMS equivalent to Alias select x as y
#Rename fields to match ServiceNow field names
computers = db['computer'].aggregate([
        {"$unwind": "$hostnames"},
        {"$project" : {
                "_id":0,
                "u_hostname": "$hostnames.name",
                "u_ipv4": "$addresses.ipv4",
                "u_status": "$status.state",
                "u_updated_timestamp": "$last_seen"
        }}

])

j = dumps({"records":computers})
#print(j)


#Set proper headers
headers = {"Content-Type":"application/json","Accept":"application/json"}

#Build HTTP Request
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=j)

#Check for HTTP codes other than 200
if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
        exit()

#Decode the JSON response into a dictionary and use the data
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())

更新:我有一个计划,但我不确定如何准确实施。

https://docs.mongodb.com/v3.0/reference/method/cursor.batchSize/

基本上我认为我可以通过创建批次并循环遍历批次来解决这个问题,每次调用一个新的 API。如果有人有任何想法,这是否是一个好的计划以及如何实施解决方案,请告诉我。谢谢

j = dumps({"records":computers}) 将 return 一个列表,因此您可以通过调用 j[x] 或遍历 for 循环轻松指向单个数据条目。这些条目中的每一个都应该被 ServiceNow 接受。

# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}

for data_point in j:

    #Build HTTP Request (Note we are using data_point instead of j)
    response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)

    #Check for HTTP codes other than 200
    if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
    else:
        # This is a response of success for a single record
        print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())

exit()

如果 MongoDB 中有 100 个新条目,这将对 ServiceNow 进行 100 POST 次调用。您的 ServiceNow 实例应该能够处理负载,并且您可以非常轻松地识别未能加载的记录。

但是,如果您出于任何原因需要压缩调用次数,我建议将您的列表拆分为 'sublists',例如 one-liner featured in this answer:

# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}

# Each POST will send up to 10 records of data
split_size = 10

# Note the two places where our split_size variable is used
for data_point in [j[x:x+split_size] for x in xrange(0, len(j), split_size)]:

    #Build HTTP Request (Note we are using data_point instead of j)
    response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)

    #Check for HTTP codes other than 200
    if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
    else:
        # This is a response of success for a single record
        print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())

exit()