如何在键值数据库中进行并行化?

How to have parallelization in Key-Value Databases?

我的目的是对大型 csv 文件进行版本控制,因此,我使用的是键值数据库,其中键是完整行中的列,值是行本身。 例如:

Name, Age, Roll.No
Aviral, 22, 1
Apoorv, 19, 2

如果我将 Roll no 作为键,我的意图是将数据库中的键作为 rollno(可能是它的哈希值)并将值作为完整的行:Aviral, 22, 1

我已经完成了上面的实现,但是为了处理大型 csv 文件(甚至 20gb 和 534M 行),速度太慢了。我正在实施 dask 但它比正常 pandas 顺序流慢。我的疑问是,如何在键值数据库中进行并行插入?

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.key)] = self.row

    def index_row(self, key, row):
        self.row = row
        self.key = key
        DB.process(self.dbproc, "index.kch")

# destination = "/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv"
destination = "10M_rows.csv"
df = dd.read_csv(destination)
df_for_file_attributes = pd.read_csv(destination, nrows=2)
column_list = list(df_for_file_attributes)

# df = df.compute(scheduler='processes')     # convert to pandas

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    #for row in d.itertuples(index=False):
    # print(row)
    print("a block called!")
    d = d.to_dict(orient='records')
    for row in d:
        key = str(row["0"])
        row = json.dumps(row, default=str)
        ob.index_row(key, row)

print("Calling compute!")
dask.compute(*[print_a_block(d) for d in df.to_delayed()])
print(datetime.utcnow() - start_time)

Kyotocabinet 不允许您并行化插入 (https://fallabs.com/kyotocabinet/spex.html), Each Writer will block till another Writer completes so you can't parallelize inserts in kyotocabinet, but Redis will allow such insert, to optimize further use Redis pipelining (https://redis.io/topics/pipelining),这将批量处理您的数据并在加载大量数据时在很大程度上减少 RTT。

您的任务 运行 比顺序处理慢的原因是管理多进程顺序写入数据库的开销。