Python 多处理 - 写入 JSON 文件

Python Multiprocessing - Writing to a JSON file

我有一个要求,我正在阅读来自 Rest-API 的 Order Dictionary,如下所示:-

OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947735C'), ('_cd', '1809:718727063')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947736C'), ('_cd', '1809:718727065')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947737C'), ('_cd', '1809:718727067')])

我的要求是读取 Ordered Dictionary 并将多处理中 JSON 格式的数据写入 JSON 文件。但是我的代码无法正常工作,它没有将 JSON 格式的数据写入我的目标文件。请提出建议。

代码如下:-


from multiprocessing import Pool
from collections import OrderedDict
import simplejson as json

rr = OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])

f = open('iitp222.json', "a")

def write_data(args):
    f.write(args + '\n')

###Get the results and display them using the ResultsReader.
if __name__ == "__main__":
    for result in rr:
            print result
            p = Pool()
            result = p.map(write_data, json.dumps(result))
            p.close()
            p.join()
    f.close()

我可以通过以下代码解决我的问题

#------------Import Lib-----------------------#
import splunklib.results as results
from collections import OrderedDict
import splunklib.client  as client
import simplejson as json, sys
from datetime import datetime
import multiprocessing as mp

fn=sys.argv[1]
HOST = "restapi.xxxx.com"
PORT = 443

#----Capturing Current Hour & Min--------------#
Cur_min1 = datetime.now().strftime('%M')
Cur_min = int(Cur_min1)/2

#----Evaluating time to flip different users --- #
if int(Cur_min) % 4 == 0:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 1:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 2:
        USERNAME = "kuuuu1"
        PASSWORD = "yyyyyy"
else:
        USERNAME = "xxxx"
        PASSWORD = "yyyyyy"

# Create a Service instance and log in
try:
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)
    rr = results.ResultsReader(service.jobs.export("search index=xxx host=yyyyyy* sourcetype=xxxx splunk_server=idx* earliest=-2m@m"))
    f1=open(fn, 'w')
    f1.close()
except:
    os.system("python /home/xxx/MS_TeamsNotification.py 'Unable to connect Splunk Rest-API'")
    exit()

###Get the results and display them using the ResultsReader.

def worker(arg, q):
    res = json.dumps(arg)
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''
    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for result in rr:
        if isinstance(result, dict):
            job = pool.apply_async(worker, (result, q))
            jobs.append(job)
    assert rr.is_preview == False

    # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()