如何在使用多处理时将数据添加到 json 文件?

How to add data to a json file while making use of multiproccesing?

我正在使用名为 TinyDB 的用户友好型 json 面向文档的数据库。但是我无法将多条数据添加到我的数据库中,因为我正在使用多进程。一段时间后,我收到错误,id x 已经存在于数据库中(这是因为 2 个或更多进程试图同时添加数据)。有什么办法可以解决吗?

每 运行 我插入新的唯一参数。

示例参数:

params = {'id' = 1, 'name': 'poop', 'age': 99}

代码:

resultsDb = TinyDB('db/resultsDb.json')

def run(params):
    resultsDb.insert({'id': params['id'], 'name': params['name'], 'age': params['age']})

maxProcesses = 12 # Cores in my pc

for i in range(maxProcesses):
    processes.append(Process(target=run, args=(params,)))

for p in processes:
    p.start()

for p in processes:
    p.join()

如果您想并行写入数据,则需要将问题分解为更小的步骤,以确保插入数据的步骤已经将所有内容合并在一起。这样你就不会在写入时遇到任何(明显的)线程安全问题。

例如,假设您的 JSON 文件具有三个字段,emailnameage,并且您希望在 email 上强制执行唯一性, 但有些记录是重复输入的。例如,有一个条目有 emily@smith.com 和她的名字,另一个条目有她的年龄。

你首先要做一些事情来将所有东西组合在一起,然后并行化写入。

我将绘制一些代码(请注意我还没有测试过!):

my_data = # some JSON data
grouped = {}

for datum in my_data:
    if datum['email'] in grouped:
        grouped[datum['email']].update(datum)
    else:
        grouped[datum['email']] = datum

# parallelize write as above

我无法在我有权访问的 Linux 系统上对此进行测试,因为它是一个共享服务器,在该服务器上 运行 代码所需的某些设施已被禁止访问。这是下面的 Windows 版本。但主要特点是:

  1. 它使用 Lock 来确保插入被序列化,我认为这是 运行 没有错误所必需的。当然,这违背了并行化代码的目的,可以得出结论,使用多处理或多线程确实没有意义。
  2. 在 Windows 中,我不必将 resultsDb = TinyDB('db.json') 语句移动到 run 函数中,因为在 spawn 用于创建新进程的平台上,例如作为 Windows,如果我将该语句保留在全局范围内,它无论如何都会为每个新创建的进程执行。但是,对于 Linux,其中 fork 用于创建新进程,它不会为每个新进程执行,而是每个新进程将继承主进程打开的单个数据库。这可能有效也可能无效——您可以在全局范围内使用语句尝试两种方式。如果将它放回全局范围以查看它是否在那里工作,则不需要在源代码底部使用相同的语句。
from tinydb import TinyDB
from multiprocessing import Process, Lock


def run(lock, params):
    resultsDb = TinyDB('db/resultsDb.json')
    with lock:
        resultsDb.insert({'id': params['id'], 'name': params['name'], 'age': params['age']})
    print('Successfully inserted.')

# required by Windows:
if __name__ == '__main__':
    params = {'id': 1, 'name': 'poop', 'age': 99}

    maxProcesses = 12 # Cores in my pc

    lock = Lock()
    processes = []
    for i in range(maxProcesses):
        processes.append(Process(target=run, args=(lock, params)))

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    # remove the following if the first one is at global scope:
    resultsDb = TinyDB('db/resultsDb.json')
    print(resultsDb.all())

打印:

Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
Successfully inserted.
[{'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}, {'id': 1, 'name': 'poop', 'age': 99}]