Symfony3:如何尽可能快地从 CSV 文件进行大量导入?
Symfony3 : How to do a massive import from a CSV file as fast as possible?
我有一个包含超过 690 000 行.
的 .csv 文件
我找到了一个导入数据的解决方案,效果很好,但有点慢...(每 3 秒大约 100 条记录 = 63 小时!!)。
如何改进我的代码以使其更快?
我通过控制台命令进行导入。
另外,我想只导入数据库中不存在的处方者(以节省时间)。更复杂的是,没有字段是真正唯一的(id 除外)。
两位开处方者可以有相同的姓氏、名字、居住在同一个城市并且具有相同的 RPPS 和专业代码。但是,正是这 6 个字段的组合使它们独一无二!
这就是我在创建新字段之前检查每个字段的原因。
<?php
namespace AppBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Helper\ProgressBar;
use AppBundle\Entity\Prescriber;
class PrescribersImportCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
// the name of the command (the part after "bin/console")
->setName('import:prescribers')
->setDescription('Import prescribers from .csv file')
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
// Show when the script is launched
$now = new \DateTime();
$output->writeln('<comment>Start : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');
// Import CSV on DB via Doctrine ORM
$this->import($input, $output);
// Show when the script is over
$now = new \DateTime();
$output->writeln('<comment>End : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');
}
protected function import(InputInterface $input, OutputInterface $output)
{
$em = $this->getContainer()->get('doctrine')->getManager();
// Turning off doctrine default logs queries for saving memory
$em->getConnection()->getConfiguration()->setSQLLogger(null);
// Get php array of data from CSV
$data = $this->getData();
// Start progress
$size = count($data);
$progress = new ProgressBar($output, $size);
$progress->start();
// Processing on each row of data
$batchSize = 100; # frequency for persisting the data
$i = 1; # current index of records
foreach($data as $row) {
$p = $em->getRepository('AppBundle:Prescriber')->findOneBy(array(
'rpps' => $row['rpps'],
'lastname' => $row['nom'],
'firstname' => $row['prenom'],
'profCode' => $row['code_prof'],
'postalCode' => $row['code_postal'],
'city' => $row['ville'],
));
# If the prescriber doest not exist we create one
if(!is_object($p)){
$p = new Prescriber();
$p->setRpps($row['rpps']);
$p->setLastname($row['nom']);
$p->setFirstname($row['prenom']);
$p->setProfCode($row['code_prof']);
$p->setPostalCode($row['code_postal']);
$p->setCity($row['ville']);
$em->persist($p);
}
# flush each 100 prescribers persisted
if (($i % $batchSize) === 0) {
$em->flush();
$em->clear(); // Detaches all objects from Doctrine!
// Advancing for progress display on console
$progress->advance($batchSize);
$progress->display();
}
$i++;
}
// Flushing and clear data on queue
$em->flush();
$em->clear();
// Ending the progress bar process
$progress->finish();
}
protected function getData()
{
// Getting the CSV from filesystem
$fileName = 'web/docs/prescripteurs.csv';
// Using service for converting CSV to PHP Array
$converter = $this->getContainer()->get('app.csvtoarray_converter');
$data = $converter->convert($fileName);
return $data;
}
}
编辑
根据@Jake N的回答,这里是最终代码。
速度非常非常快! 10 分钟导入 653 727 / 693 230 行(39 503 个重复项!)
1) 在我的 table 中添加两列:created_at
和 updated_at
2) 在我的 table 的每一列(id 和日期除外)中添加一个类型为 UNIQUE
的单个 index
,以防止与 phpMyAdmin 重复项目。
3) 在我的查询中添加 ON DUPLICATE KEY UPDATE
,以仅更新 updated_at
列。
foreach($data as $row) {
$sql = "INSERT INTO prescripteurs (rpps, nom, prenom, code_prof, code_postal, ville)
VALUES(:rpps, :nom, :prenom, :codeprof, :cp, :ville)
ON DUPLICATE KEY UPDATE updated_at = NOW()";
$stmt = $em->getConnection()->prepare($sql);
$r = $stmt->execute(array(
'rpps' => $row['rpps'],
'nom' => $row['nom'],
'prenom' => $row['prenom'],
'codeprof' => $row['code_prof'],
'cp' => $row['code_postal'],
'ville' => $row['ville'],
));
if (!$r) {
$progress->clear();
$output->writeln('<comment>An error occured.</comment>');
$progress->display();
} elseif (($i % $batchSize) === 0) {
$progress->advance($batchSize);
$progress->display();
}
$i++;
}
// Ending the progress bar process
$progress->finish();
1.不要使用 Doctrine
如果可以的话,尽量不要使用 Doctrine,它会占用内存,而且你发现它很慢。尝试仅使用原始 SQL 通过简单的 INSERT
语句进行导入:
$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES ('Hello', 'World', '1');
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute();
或者您可以使用以下值准备语句:
$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES (:label, :code, :hidden);
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute(['label' => 'Hello', 'code' => 'World', 'hidden' => 1);
未经测试的代码,但它应该可以帮助您入门,因为我以前就是这样做的。
2。指数
此外,对于您的支票,您是否有所有这些字段的索引?以便尽快查找。
我有一个包含超过 690 000 行.
的 .csv 文件我找到了一个导入数据的解决方案,效果很好,但有点慢...(每 3 秒大约 100 条记录 = 63 小时!!)。
如何改进我的代码以使其更快?
我通过控制台命令进行导入。
另外,我想只导入数据库中不存在的处方者(以节省时间)。更复杂的是,没有字段是真正唯一的(id 除外)。
两位开处方者可以有相同的姓氏、名字、居住在同一个城市并且具有相同的 RPPS 和专业代码。但是,正是这 6 个字段的组合使它们独一无二!
这就是我在创建新字段之前检查每个字段的原因。
<?php
namespace AppBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Helper\ProgressBar;
use AppBundle\Entity\Prescriber;
class PrescribersImportCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
// the name of the command (the part after "bin/console")
->setName('import:prescribers')
->setDescription('Import prescribers from .csv file')
;
}
protected function execute(InputInterface $input, OutputInterface $output)
{
// Show when the script is launched
$now = new \DateTime();
$output->writeln('<comment>Start : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');
// Import CSV on DB via Doctrine ORM
$this->import($input, $output);
// Show when the script is over
$now = new \DateTime();
$output->writeln('<comment>End : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');
}
protected function import(InputInterface $input, OutputInterface $output)
{
$em = $this->getContainer()->get('doctrine')->getManager();
// Turning off doctrine default logs queries for saving memory
$em->getConnection()->getConfiguration()->setSQLLogger(null);
// Get php array of data from CSV
$data = $this->getData();
// Start progress
$size = count($data);
$progress = new ProgressBar($output, $size);
$progress->start();
// Processing on each row of data
$batchSize = 100; # frequency for persisting the data
$i = 1; # current index of records
foreach($data as $row) {
$p = $em->getRepository('AppBundle:Prescriber')->findOneBy(array(
'rpps' => $row['rpps'],
'lastname' => $row['nom'],
'firstname' => $row['prenom'],
'profCode' => $row['code_prof'],
'postalCode' => $row['code_postal'],
'city' => $row['ville'],
));
# If the prescriber doest not exist we create one
if(!is_object($p)){
$p = new Prescriber();
$p->setRpps($row['rpps']);
$p->setLastname($row['nom']);
$p->setFirstname($row['prenom']);
$p->setProfCode($row['code_prof']);
$p->setPostalCode($row['code_postal']);
$p->setCity($row['ville']);
$em->persist($p);
}
# flush each 100 prescribers persisted
if (($i % $batchSize) === 0) {
$em->flush();
$em->clear(); // Detaches all objects from Doctrine!
// Advancing for progress display on console
$progress->advance($batchSize);
$progress->display();
}
$i++;
}
// Flushing and clear data on queue
$em->flush();
$em->clear();
// Ending the progress bar process
$progress->finish();
}
protected function getData()
{
// Getting the CSV from filesystem
$fileName = 'web/docs/prescripteurs.csv';
// Using service for converting CSV to PHP Array
$converter = $this->getContainer()->get('app.csvtoarray_converter');
$data = $converter->convert($fileName);
return $data;
}
}
编辑
根据@Jake N的回答,这里是最终代码。
速度非常非常快! 10 分钟导入 653 727 / 693 230 行(39 503 个重复项!)
1) 在我的 table 中添加两列:created_at
和 updated_at
2) 在我的 table 的每一列(id 和日期除外)中添加一个类型为 UNIQUE
的单个 index
,以防止与 phpMyAdmin 重复项目。
3) 在我的查询中添加 ON DUPLICATE KEY UPDATE
,以仅更新 updated_at
列。
foreach($data as $row) {
$sql = "INSERT INTO prescripteurs (rpps, nom, prenom, code_prof, code_postal, ville)
VALUES(:rpps, :nom, :prenom, :codeprof, :cp, :ville)
ON DUPLICATE KEY UPDATE updated_at = NOW()";
$stmt = $em->getConnection()->prepare($sql);
$r = $stmt->execute(array(
'rpps' => $row['rpps'],
'nom' => $row['nom'],
'prenom' => $row['prenom'],
'codeprof' => $row['code_prof'],
'cp' => $row['code_postal'],
'ville' => $row['ville'],
));
if (!$r) {
$progress->clear();
$output->writeln('<comment>An error occured.</comment>');
$progress->display();
} elseif (($i % $batchSize) === 0) {
$progress->advance($batchSize);
$progress->display();
}
$i++;
}
// Ending the progress bar process
$progress->finish();
1.不要使用 Doctrine
如果可以的话,尽量不要使用 Doctrine,它会占用内存,而且你发现它很慢。尝试仅使用原始 SQL 通过简单的 INSERT
语句进行导入:
$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES ('Hello', 'World', '1');
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute();
或者您可以使用以下值准备语句:
$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES (:label, :code, :hidden);
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute(['label' => 'Hello', 'code' => 'World', 'hidden' => 1);
未经测试的代码,但它应该可以帮助您入门,因为我以前就是这样做的。
2。指数
此外,对于您的支票,您是否有所有这些字段的索引?以便尽快查找。