如何同时读取多个不同的源?
How to read from multiple different sources concurrently?
我正在从 3 个数据源中提取数据,CSV_1、CSV_2 和 DB 1。CSV_1 和 CSV_2 具有不同的模式和编码。我正在使用 pandas.read_csv
读取 CSV pandas.read_sql
以从数据库中提取。我是 concurrency/parallelism 的新手,但根据我的理解,因为 IO 是我的约束,多线程可以帮助我实现速度提升。
我想我可以使用 concurrent.futures.ThreadPoolExecutor
及其 map 方法并行读取 csvs,如下所示:
files = ['csv_1.csv', 'csv_2.csv']
with ThreadPoolExecutor(2) as executor:
results = executor.map(pd.read_csv, files)
但是据我所见,因为我需要将不同的参数应用于 read_csv(即编码和数据类型),所以这行不通。有没有办法为每个可迭代对象使用具有不同参数的相同函数?仅此部分,异步读取两个需要不同 pd.read_csv
参数的不同 csvs 将是一个巨大的胜利。
理想情况下,我还想添加第三个从数据库读取的线程。
有办法实现吗?
有几种方法可以做到这一点。您只需要一个知道如何将 map
使用的单个参数扩展到您想要的调用中的临时函数。在第一个示例中,您可以使用一些固定类型。这很好,因为它很容易看到你喜欢的每种类型的参数
def csv_reader(params):
filename, csv_type = *params
if csv_type == 'footype':
return pd.read_csv(filename, sep="|")
elif csv_type == 'bartype':
return pd.read_csv(filename, columns=["A", "B", "C"])
files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader, files)
但您始终可以将其设为通用
def csv_reader_generic(params):
filename, args, kw = *params
return pd.read_csv(filename, *args, **kwargs)
files = [('csv_1.csv', tuple(), {"sep":"|"}),
('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader_generic, files)
tdelaney 的详尽回答涵盖了您提出的问题,但提供另一个建议,如果您可以控制文件创建,则远离 CSV 作为您的存储格式可能会产生更大的差异。
使用我在下面生成的中等大小的数据帧,写入 2 个 CSV 需要 56.2 秒,连续读取需要 5.66 秒或使用线程需要 3.06 秒。但是,切换到 Parquet 后写入耗时 2 秒,读取耗时 317 毫秒,同时还将文件大小减少了不到一半。
In [282]: %time df1 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 174 ms
In [283]: %time df2 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 190 ms
In [284]: %time df1.to_csv("test1.csv", index=False)
Wall time: 28 s
In [285]: %time df2.to_csv("test2.csv", index=False)
Wall time: 28.2 s
In [286]: %time df1 = pd.read_csv("test1.csv"); df2 = pd.read_csv("test2.csv")
Wall time: 5.66 s
In [287]: files = ["test1.csv", "test2.csv"]
In [288]: %time with ThreadPoolExecutor(2) as executor: results = executor.map(pd.read_csv, files)
Wall time: 3.06 s
In [289]: %time df1.to_parquet('test1.parquet')
Wall time: 939 ms
In [290]: %time df2.to_parquet('test2.parquet')
Wall time: 917 ms
In [291]: %time df1 = pd.read_parquet("test1.parquet"); df2 = pd.read_parquet("test2.parquet")
Wall time: 317 ms
In [292]: !ls -lh test*
-rw-r--r-- 1 Randy 197609 360K Aug 30 2017 test.png
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test1.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:54 test1.parquet
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test2.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:55 test2.parquet
我正在从 3 个数据源中提取数据,CSV_1、CSV_2 和 DB 1。CSV_1 和 CSV_2 具有不同的模式和编码。我正在使用 pandas.read_csv
读取 CSV pandas.read_sql
以从数据库中提取。我是 concurrency/parallelism 的新手,但根据我的理解,因为 IO 是我的约束,多线程可以帮助我实现速度提升。
我想我可以使用 concurrent.futures.ThreadPoolExecutor
及其 map 方法并行读取 csvs,如下所示:
files = ['csv_1.csv', 'csv_2.csv']
with ThreadPoolExecutor(2) as executor:
results = executor.map(pd.read_csv, files)
但是据我所见,因为我需要将不同的参数应用于 read_csv(即编码和数据类型),所以这行不通。有没有办法为每个可迭代对象使用具有不同参数的相同函数?仅此部分,异步读取两个需要不同 pd.read_csv
参数的不同 csvs 将是一个巨大的胜利。
理想情况下,我还想添加第三个从数据库读取的线程。
有办法实现吗?
有几种方法可以做到这一点。您只需要一个知道如何将 map
使用的单个参数扩展到您想要的调用中的临时函数。在第一个示例中,您可以使用一些固定类型。这很好,因为它很容易看到你喜欢的每种类型的参数
def csv_reader(params):
filename, csv_type = *params
if csv_type == 'footype':
return pd.read_csv(filename, sep="|")
elif csv_type == 'bartype':
return pd.read_csv(filename, columns=["A", "B", "C"])
files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader, files)
但您始终可以将其设为通用
def csv_reader_generic(params):
filename, args, kw = *params
return pd.read_csv(filename, *args, **kwargs)
files = [('csv_1.csv', tuple(), {"sep":"|"}),
('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader_generic, files)
tdelaney 的详尽回答涵盖了您提出的问题,但提供另一个建议,如果您可以控制文件创建,则远离 CSV 作为您的存储格式可能会产生更大的差异。
使用我在下面生成的中等大小的数据帧,写入 2 个 CSV 需要 56.2 秒,连续读取需要 5.66 秒或使用线程需要 3.06 秒。但是,切换到 Parquet 后写入耗时 2 秒,读取耗时 317 毫秒,同时还将文件大小减少了不到一半。
In [282]: %time df1 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 174 ms
In [283]: %time df2 = pd.DataFrame(np.random.random((1000000, 20)))
Wall time: 190 ms
In [284]: %time df1.to_csv("test1.csv", index=False)
Wall time: 28 s
In [285]: %time df2.to_csv("test2.csv", index=False)
Wall time: 28.2 s
In [286]: %time df1 = pd.read_csv("test1.csv"); df2 = pd.read_csv("test2.csv")
Wall time: 5.66 s
In [287]: files = ["test1.csv", "test2.csv"]
In [288]: %time with ThreadPoolExecutor(2) as executor: results = executor.map(pd.read_csv, files)
Wall time: 3.06 s
In [289]: %time df1.to_parquet('test1.parquet')
Wall time: 939 ms
In [290]: %time df2.to_parquet('test2.parquet')
Wall time: 917 ms
In [291]: %time df1 = pd.read_parquet("test1.parquet"); df2 = pd.read_parquet("test2.parquet")
Wall time: 317 ms
In [292]: !ls -lh test*
-rw-r--r-- 1 Randy 197609 360K Aug 30 2017 test.png
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test1.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:54 test1.parquet
-rw-r--r-- 1 Randy 197609 369M May 27 21:53 test2.csv
-rw-r--r-- 1 Randy 197609 158M May 27 21:55 test2.parquet