如何同时读取多个不同的源?

How to read from multiple different sources concurrently?

我正在从 3 个数据源中提取数据,CSV_1、CSV_2 和 DB 1。CSV_1 和 CSV_2 具有不同的模式和编码。我正在使用 pandas.read_csv 读取 CSV pandas.read_sql 以从数据库中提取。我是 concurrency/parallelism 的新手,但根据我的理解,因为 IO 是我的约束,多线程可以帮助我实现速度提升。

我想我可以使用 concurrent.futures.ThreadPoolExecutor 及其 map 方法并行读取 csvs,如下所示:

files = ['csv_1.csv', 'csv_2.csv']

with ThreadPoolExecutor(2) as executor:
    results = executor.map(pd.read_csv, files)

但是据我所见,因为我需要将不同的参数应用于 read_csv(即编码和数据类型),所以这行不通。有没有办法为每个可迭代对象使用具有不同参数的相同函数?仅此部分,异步读取两个需要不同 pd.read_csv 参数的不同 csvs 将是一个巨大的胜利。

理想情况下,我还想添加第三个从数据库读取的线程。

有办法实现吗?

有几种方法可以做到这一点。您只需要一个知道如何将 map 使用的单个参数扩展到您想要的调用中的临时函数。在第一个示例中,您可以使用一些固定类型。这很好,因为它很容易看到你喜欢的每种类型的参数

def csv_reader(params):
    filename, csv_type = *params
    if csv_type == 'footype':
        return pd.read_csv(filename, sep="|")
    elif csv_type == 'bartype':
        return pd.read_csv(filename, columns=["A", "B", "C"])

files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]

with ThreadPoolExecutor(2) as executor:
    results = executor.map(csv_reader, files)

但您始终可以将其设为通用

def csv_reader_generic(params):
    filename, args, kw = *params
    return pd.read_csv(filename, *args, **kwargs)

files = [('csv_1.csv', tuple(), {"sep":"|"}), 
    ('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]

with ThreadPoolExecutor(2) as executor:
    results = executor.map(csv_reader_generic, files)

tdelaney 的详尽回答涵盖了您提出的问题,但提供另一个建议,如果您可以控制文件创建,则远离 CSV 作为您的存储格式可能会产生更大的差异。

使用我在下面生成的中等大小的数据帧,写入 2 个 CSV 需要 56.2 秒,连续读取需要 5.66 秒或使用线程需要 3.06 秒。但是,切换到 Parquet 后写入耗时 2 秒,读取耗时 317 毫秒,同时还将文件大小减少了不到一半。

In [282]: %time df1 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 174 ms

In [283]: %time df2 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 190 ms

In [284]: %time df1.to_csv("test1.csv", index=False)
Wall time: 28 s

In [285]: %time df2.to_csv("test2.csv", index=False)
Wall time: 28.2 s

In [286]: %time df1 = pd.read_csv("test1.csv"); df2 = pd.read_csv("test2.csv")
Wall time: 5.66 s

In [287]: files = ["test1.csv", "test2.csv"]

In [288]: %time with ThreadPoolExecutor(2) as executor: results = executor.map(pd.read_csv, files)
Wall time: 3.06 s

In [289]: %time df1.to_parquet('test1.parquet')
Wall time: 939 ms

In [290]: %time df2.to_parquet('test2.parquet')
Wall time: 917 ms

In [291]: %time df1 = pd.read_parquet("test1.parquet"); df2 = pd.read_parquet("test2.parquet")
Wall time: 317 ms

In [292]: !ls -lh test*
-rw-r--r-- 1 Randy 197609 360K Aug 30  2017 test.png
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test1.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:54 test1.parquet
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test2.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:55 test2.parquet