如何在当前 Python 应用程序中使用多处理?
How to use multiprocessing in current Python application?
我有一个从不同目录读取数千个文件的应用程序,它读取它们,对它们进行一些处理,然后将数据发送到数据库。我有 1 个问题,大约需要。 1 小时完成 1 个目录中的所有文件,我有 19 个目录(将来可能更多)。现在它一个接一个地执行它,我想运行所有东西都并行所以我加快了速度。
这是我的代码:
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`"
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
for mp in mp_mysql:
if mp in mp_server:
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
我正在使用多处理,其中每个进程都有自己的数据库连接。我对您的代码进行了最小的更改,以尝试并行处理目录。但是,我不确定诸如 subdir_paths
之类的变量是否正确命名,因为其名称末尾的“s”暗示它包含多个路径名。
有人建议这个问题更适合代码审查的原因是,大概你有一个已经在工作的程序,而你只是在寻求性能改进(当然,这适用于很大一部分SO 上贴有 multiprocessing
标签的问题)。这类问题应该发布在 https://codereview.stackexchange.com/.
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connnection():
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
return mydb
def get_mp_list():
select_antenna = "SELECT * FROM `antenna`"
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mp in mp_server]
return mp_list
def process_mp(mp):
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
did_insert = False
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
if line_data0:
cursor.executemany(q1, line_data0)
did_insert = True
if did_insert:
mydb.commit()
mydb.close()
def main():
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list)))
results = pool.imap_unordered(process_mp, mp_list)
while True:
try:
result = next(results)
except StopIteration:
break
except BaseException as e:
print(e)
if __name__ == '__main__':
main()
我有一个从不同目录读取数千个文件的应用程序,它读取它们,对它们进行一些处理,然后将数据发送到数据库。我有 1 个问题,大约需要。 1 小时完成 1 个目录中的所有文件,我有 19 个目录(将来可能更多)。现在它一个接一个地执行它,我想运行所有东西都并行所以我加快了速度。
这是我的代码:
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
cursor = mydb.cursor()
select_antenna = "SELECT * FROM `antenna`"
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
for mp in mp_mysql:
if mp in mp_server:
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for line in line_data0:
cursor.execute(q1, line)
我正在使用多处理,其中每个进程都有自己的数据库连接。我对您的代码进行了最小的更改,以尝试并行处理目录。但是,我不确定诸如 subdir_paths
之类的变量是否正确命名,因为其名称末尾的“s”暗示它包含多个路径名。
有人建议这个问题更适合代码审查的原因是,大概你有一个已经在工作的程序,而你只是在寻求性能改进(当然,这适用于很大一部分SO 上贴有 multiprocessing
标签的问题)。这类问题应该发布在 https://codereview.stackexchange.com/.
import mysql.connector
import csv
import os
import time
from datetime import datetime
import ntpath
import configparser
from multiprocessing import Pool, cpu_count
config = configparser.ConfigParser()
config.read('C:\Desktop\Energy\file_cfg.ini')
source = config['PATHS']['source']
archive = config['PATHS']['archive']
def get_connnection():
mydb = mysql.connector.connect(
host= config['DB']['host'],
user = config['DB']['user'],
passwd = config['DB']['passwd'],
database= config['DB']['database']
)
return mydb
def get_mp_list():
select_antenna = "SELECT * FROM `antenna`"
mydb = get_connection()
cursor = mydb.cursor()
cursor.execute(select_antenna)
mp_mysql = [i[0] for i in cursor.fetchall()]
mp_server = os.listdir(source)
# microbeats clean.
cursor.execute("TRUNCATE TABLE microbeats")
mydb.commit()
mydb.close()
mp_list = [mp for mp in mp_mysql if mp in mp_server]
return mp_list
def process_mp(mp):
subdir_paths = os.path.join(source, mp)
for file in os.listdir(subdir_paths):
file_paths = os.path.join(subdir_paths, file)
cr_time_s = os.path.getctime(file_paths)
cr_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(cr_time_s))
all_file_paths = [os.path.join(subdir_paths, f) for f in os.listdir(subdir_paths)]
full_file_paths = [p for p in all_file_paths if os.path.getsize(p) > 0] #<----- Control empty files.
if full_file_paths != []:
newest_file_paths = max(full_file_paths, key=os.path.getctime)
mydb = get_connection()
cursor = mydb.cursor()
did_insert = False
q1 = ("INSERT INTO microbeats"
"(`antenna`,`datetime`,`system`,`item`,`event`, `status`, `accident`)"
"VALUES (%s, %s, %s,%s, %s, %s, %s)")
for file in all_file_paths:
if file == newest_file_paths and os.path.getctime(newest_file_paths) < time.time() - 120:
with open(file, 'rt') as f:
reader = csv.reader(f, delimiter ='\t')
line_data0 = list()
col = next(reader)
for line in reader:
line.insert(0, mp)
line.insert(1, cr_time)
if line != []: #<----- Control empty directories.
line_data0.append(line)
if line_data0:
cursor.executemany(q1, line_data0)
did_insert = True
if did_insert:
mydb.commit()
mydb.close()
def main():
mp_list = get_mp_list()
pool = Pool(min(cpu_count(), len(mp_list)))
results = pool.imap_unordered(process_mp, mp_list)
while True:
try:
result = next(results)
except StopIteration:
break
except BaseException as e:
print(e)
if __name__ == '__main__':
main()