与多个 Python 脚本共享字典

Share a dict with multiple Python scripts

我希望可以同时从多个 Python 脚本 运行 访问一个独特的 dict (key/value) 数据库。

如果script1.py更新d[2839],那么script2.py几秒后查询d[2839]时应该看到修改后的值 .

什么是 Pythonic 解决方案?

注意:我在 Windows 上,字典最多应包含 1M 项(键和值均为整数)。

您可以使用基于文档的数据库管理器。也许对您的系统来说太重了,但并发访问通常是数据库管理系统和 API 连接到它们的原因之一。

我已经将 MongoDB 与 Python 一起使用,效果很好。 Python API 文档非常好,每个文档(数据库的元素)都是一个可以加载到 python 的字典。

我会使用 pub/sub websocket-framework,比如 Autobahn/Python,一个脚本作为 "server",它处理所有文件通信,但这取决于规模,也许这可以矫枉过正。

除了 SQLite 之外,大多数嵌入式数据存储都没有针对并发访问进行优化,我也对 SQLite 的并发性能感到好奇,所以我做了一个基准测试:

import time
import sqlite3
import os
import random
import sys
import multiprocessing


class Store():

    def __init__(self, filename='kv.db'):
        self.conn = sqlite3.connect(filename, timeout=60)
        self.conn.execute('pragma journal_mode=wal')
        self.conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
        self.conn.commit()

    def get(self, key):
        item = self.conn.execute('select value from "kv" where key=?', (key,))
        if item:
            return next(item)[0]

    def set(self, key, value):
        self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
        self.conn.commit()


def worker(n):
    d = [random.randint(0, 1<<31) for _ in range(n)]
    s = Store()
    for i in d:
        s.set(i, i)
    random.shuffle(d)
    for i in d:
        s.get(i)


def test(c):
    n = 5000
    start = time.time()
    ps = []
    for _ in range(c):
        p = multiprocessing.Process(target=worker, args=(n,))
        p.start()
        ps.append(p)
    while any(p.is_alive() for p in ps):
        time.sleep(0.01)
    cost = time.time() - start
    print(f'{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}')


def main():
    print(f'concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)')
    for c in range(1, 9):
        test(c)


if __name__ == '__main__':
    main()

我的 4 核结果 macOS 盒子,SSD 容量:

concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
1           0.65    7638.43                 7638.43
2           1.30    3854.69                 7709.38
3           1.83    2729.32                 8187.97
4           2.43    2055.25                 8221.01
5           3.07    1629.35                 8146.74
6           3.87    1290.63                 7743.78
7           4.80    1041.73                 7292.13
8           5.37    931.27                  7450.15

在 8 核 windows 服务器 2012 云服务器上的结果,SSD 容量:

concurrency     time(s) pre process TPS(r/s)    total TPS(r/s)
1               4.12    1212.14                 1212.14
2               7.87    634.93                  1269.87
3               14.06   355.56                  1066.69
4               15.84   315.59                  1262.35
5               20.19   247.68                  1238.41
6               24.52   203.96                  1223.73
7               29.94   167.02                  1169.12
8               34.98   142.92                  1143.39

无论并发性如何,总体吞吐量都是一致的,SQLite 在 windows 上比 macOS 慢,希望这对您有所帮助。


由于 SQLite 写锁是数据库明智的,为了获得更多的 TPS,您可以将数据分区到多数据库文件:

class MultiDBStore():

    def __init__(self, buckets=5):
        self.buckets = buckets
        self.conns = []
        for n in range(buckets):
            conn = sqlite3.connect(f'kv_{n}.db', timeout=60)
            conn.execute('pragma journal_mode=wal')
            conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
            conn.commit()
            self.conns.append(conn)

    def _get_conn(self, key):
        assert isinstance(key, int)
        return self.conns[key % self.buckets]

    def get(self, key):
        item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
        if item:
            return next(item)[0]

    def set(self, key, value):
        conn = self._get_conn(key)
        conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
        conn.commit()

我的 mac 有 20 个分区的结果:

concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
1           2.07    4837.17                 4837.17
2           2.51    3980.58                 7961.17
3           3.28    3047.68                 9143.03
4           4.02    2486.76                 9947.04
5           4.44    2249.94                 11249.71
6           4.76    2101.26                 12607.58
7           5.25    1903.69                 13325.82
8           5.71    1752.46                 14019.70

总 TPS 高于单个数据库文件。

在出现 Redis 之前,出现了 Memcached(适用于 windows)。 这是一个教程。 https://realpython.com/blog/python/python-memcache-efficient-caching/

我会考虑 2 个选项,都是嵌入式数据库

SQLite

如回答 and here应该没问题

伯克利数据库

link

Berkeley DB (BDB) is a software library intended to provide a high-performance embedded database for key/value data

专为您的目的而设计

BDB can support thousands of simultaneous threads of control or concurrent processes manipulating databases as large as 256 terabytes,3 on a wide variety of operating systems including most Unix-like and Windows systems, and real-time operating systems.

它很强大,即使不是几十年也已经存在了很多年

启动 redis/memcached/ 任何其他需要系统操作员参与的成熟的基于套接字的服务器 IMO 是任务在位于同一个盒子上的 2 个脚本之间交换数据的开销

CodeernintyDB 值得探索,使用服务器版本。

http://labs.codernity.com/codernitydb/

服务器版本: http://labs.codernity.com/codernitydb/server.html

为此,您可以使用 python 词典。

Create a generic class or script named as G, that initializes a dictionary in it. The G will run the script1.py & script2.py and passes the dictionary to both scripts file, in python dictionary is passed by reference by default. In this way, a single dictionary will be used to store data and both scripts can modify dictionary values, changes can be seen in both of the scripts. I hope script1.py and script2.py are class based. It doesn't guarantee the persistence of data. For persistence, you can store the data in the database after x intervals.

例子

script1.py

class SCRIPT1:

    def __init__(self, dictionary):
        self.dictionary = dictionary
        self.dictionary.update({"a":"a"})
        print("SCRIPT1 : ", self.dictionary)

    def update(self):
        self.dictionary.update({"c":"c"})          

script2.py

class SCRIPT2:
    def __init__(self, dictionary):
        self.dictionary = dictionary
        self.dictionary.update({"b":"b"})
        print("SCRIPT 2 : " , self.dictionary)

main_script.py

import script1
import script2

x = {}

obj1 = script1.SCRIPT1(x) # output: SCRIPT1 :  {'a': 'a'}
obj2 = script2.SCRIPT2(x) # output: SCRIPT 2 :  {'a': 'a', 'b': 'b'}
obj1.update()
print("SCRIPT 1 dict: ", obj1.dictionary) # output: SCRIPT 1 dict:  {'c': 'c', 'a': 'a', 'b': 'b'}

print("SCRIPT 2 dict: ", obj2.dictionary) # output: SCRIPT 2 dict:  {'c': 'c', 'a': 'a', 'b': 'b'}

同时在您要 运行 脚本的目录中创建一个空的 _ init _.py 文件。

另一种选择是:

Redis

听起来您确实需要某种数据库。

如果 redis 不能用于 windows,那么我会看看 MongoDB。

https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/

MongoDB 与 python 配合使用效果很好,并且功能类似于 redis。以下是 PyMongo 的安装文档: http://api.mongodb.com/python/current/installation.html?_ga=2.78008212.1422709185.1517530606-587126476.1517530605

另外,很多人都提到了SQlite。我想你担心它一次只允许一个作家,但这并不是你真正需要担心的问题。我想它的意思是,如果有两个作者,第二个将被阻止,直到第一个完成。这可能适合您的情况。